[
https://issues.apache.org/jira/browse/DRILL-1282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17432402#comment-17432402
]
ASF GitHub Bot commented on DRILL-1282:
---------------------------------------
dzamo commented on a change in pull request #2338:
URL: https://github.com/apache/drill/pull/2338#discussion_r733582283
##########
File path:
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
##########
@@ -78,209 +79,237 @@
*
*/
class AsyncPageReader extends PageReader {
- static final org.slf4j.Logger logger =
org.slf4j.LoggerFactory.getLogger(AsyncPageReader.class);
+ static final Logger logger = LoggerFactory.getLogger(AsyncPageReader.class);
private ExecutorService threadPool;
private long queueSize;
private LinkedBlockingQueue<ReadStatus> pageQueue;
private ConcurrentLinkedQueue<Future<Void>> asyncPageRead;
private long totalPageValuesRead = 0;
- private Object pageQueueSyncronize = new Object(); // Object to use to
synchronize access to the page Queue.
+ private final Object pageQueueSyncronize = new Object(); // Object to use to
synchronize access to the page Queue.
// FindBugs complains if
we synchronize on a Concurrent Queue
- AsyncPageReader(ColumnReader<?> parentStatus, FileSystem fs, Path path,
- ColumnChunkMetaData columnChunkMetaData) throws ExecutionSetupException {
- super(parentStatus, fs, path, columnChunkMetaData);
+ AsyncPageReader(ColumnReader<?> parentStatus, FileSystem fs, Path path)
throws ExecutionSetupException {
+ super(parentStatus, fs, path);
threadPool =
parentColumnReader.parentReader.getOperatorContext().getScanExecutor();
queueSize = parentColumnReader.parentReader.readQueueSize;
pageQueue = new LinkedBlockingQueue<>((int) queueSize);
asyncPageRead = new ConcurrentLinkedQueue<>();
}
@Override
- protected void loadDictionaryIfExists(final ColumnReader<?> parentStatus,
- final ColumnChunkMetaData columnChunkMetaData, final
DirectBufInputStream f) throws UserException {
- if (columnChunkMetaData.getDictionaryPageOffset() > 0) {
- try {
- assert(columnChunkMetaData.getDictionaryPageOffset() >=
dataReader.getPos() );
- long bytesToSkip = columnChunkMetaData.getDictionaryPageOffset() -
dataReader.getPos();
- while (bytesToSkip > 0) {
- long skipped = dataReader.skip(bytesToSkip);
- if (skipped > 0) {
- bytesToSkip -= skipped;
- } else {
- // no good way to handle this. Guava uses InputStream.available to
check
- // if EOF is reached and because available is not reliable,
- // tries to read the rest of the data.
- DrillBuf skipBuf = dataReader.getNext((int) bytesToSkip);
- if (skipBuf != null) {
- skipBuf.release();
- } else {
- throw new EOFException("End of File reached.");
- }
- }
- }
- } catch (IOException e) {
- handleAndThrowException(e, "Error Reading dictionary page.");
- }
- }
- }
-
- @Override protected void init() throws IOException {
+ protected void init() throws IOException {
super.init();
//Avoid Init if a shutdown is already in progress even if init() is called
once
if (!parentColumnReader.isShuttingDown) {
asyncPageRead.offer(ExecutorServiceUtil.submit(threadPool, new
AsyncPageReaderTask(debugName, pageQueue)));
}
}
- private DrillBuf getDecompressedPageData(ReadStatus readStatus) {
- DrillBuf data;
- boolean isDictionary = false;
- synchronized (this) {
- data = readStatus.getPageData();
- readStatus.setPageData(null);
- isDictionary = readStatus.isDictionaryPage;
- }
- if (parentColumnReader.columnChunkMetaData.getCodec() !=
CompressionCodecName.UNCOMPRESSED) {
- DrillBuf compressedData = data;
- data = decompress(readStatus.getPageHeader(), compressedData);
- synchronized (this) {
- readStatus.setPageData(null);
- }
- compressedData.release();
- } else {
- if (isDictionary) {
- stats.totalDictPageReadBytes.addAndGet(readStatus.bytesRead);
- } else {
- stats.totalDataPageReadBytes.addAndGet(readStatus.bytesRead);
- }
- }
- return data;
+ /**
+ * Reads and stores this column chunk's dictionary page.
+ * @throws IOException
+ */
+ protected void loadDictionary(ReadStatus readStatus) throws IOException {
+ assert readStatus.isDictionaryPage();
+ assert this.dictionary == null;
+
+ // dictData is not a local because we need to release it later.
+ this.dictData = codecName == CompressionCodecName.UNCOMPRESSED
+ ? readStatus.getPageData()
+ : decompressPageV1(readStatus);
+
+ DictionaryPage page = new DictionaryPage(
+ asBytesInput(dictData, 0, pageHeader.uncompressed_page_size),
+ pageHeader.uncompressed_page_size,
+ pageHeader.dictionary_page_header.num_values,
+ valueOf(pageHeader.dictionary_page_header.encoding.name())
+ );
+
+ this.dictionary = page.getEncoding().initDictionary(columnDescriptor,
page);
}
- // Read and decode the dictionary data
- private void readDictionaryPageData(final ReadStatus readStatus, final
ColumnReader<?> parentStatus)
- throws UserException {
+ /**
+ * Reads a compressed v1 data page or a dictionary page, both of which are
compressed
+ * in their entirety.
+ * @return decompressed Parquet page data
+ * @throws IOException
+ */
+ protected DrillBuf decompressPageV1(ReadStatus readStatus) throws
IOException {
+ Stopwatch timer = Stopwatch.createUnstarted();
+
+ PageHeader pageHeader = readStatus.getPageHeader();
+ int inputSize = pageHeader.getCompressed_page_size();
+ int outputSize = pageHeader.getUncompressed_page_size();
+ // TODO: does reporting this number have the same meaning in an async
context?
+ long start = dataReader.getPos();
+ long timeToRead;
+
+ DrillBuf inputPageData = readStatus.getPageData();
+ DrillBuf outputPageData = this.allocator.buffer(outputSize);
+
try {
- pageHeader = readStatus.getPageHeader();
- int uncompressedSize = pageHeader.getUncompressed_page_size();
- final DrillBuf dictionaryData = getDecompressedPageData(readStatus);
- Stopwatch timer = Stopwatch.createStarted();
- allocatedDictionaryBuffers.add(dictionaryData);
- DictionaryPage page = new DictionaryPage(asBytesInput(dictionaryData, 0,
uncompressedSize),
- pageHeader.uncompressed_page_size,
pageHeader.dictionary_page_header.num_values,
- valueOf(pageHeader.dictionary_page_header.encoding.name()));
- this.dictionary =
page.getEncoding().initDictionary(parentStatus.columnDescriptor, page);
- long timeToDecode = timer.elapsed(TimeUnit.NANOSECONDS);
- stats.timeDictPageDecode.addAndGet(timeToDecode);
- } catch (Exception e) {
- handleAndThrowException(e, "Error decoding dictionary page.");
+ timer.start();
+ CompressionCodecName codecName = columnChunkMetaData.getCodec();
+ CompressionCodecFactory.BytesInputDecompressor decomp =
codecFactory.getDecompressor(codecName);
+ ByteBuffer input = inputPageData.nioBuffer(0, inputSize);
+ ByteBuffer output = outputPageData.nioBuffer(0, outputSize);
+
+ decomp.decompress(input, inputSize, output, outputSize);
+ outputPageData.writerIndex(outputSize);
+ timeToRead = timer.elapsed(TimeUnit.NANOSECONDS);
+
+ logger.trace(
+ "Col: {} readPos: {} Uncompressed_size: {} pageData: {}",
+ columnChunkMetaData.toString(),
+ dataReader.getPos(), // TODO: see comment on earlier call to getPos()
+ outputSize,
+ ByteBufUtil.hexDump(outputPageData)
+ );
+
+ this.updateStats(pageHeader, "Decompress", start, timeToRead, inputSize,
outputSize);
+ } finally {
+ readStatus.setPageData(null);
+ if (inputPageData != null) {
+ inputPageData.release();
+ }
}
- }
- private void handleAndThrowException(Exception e, String msg) throws
UserException {
- UserException ex = UserException.dataReadError(e).message(msg)
- .pushContext("Row Group Start: ",
this.parentColumnReader.columnChunkMetaData.getStartingPos())
- .pushContext("Column: ",
this.parentColumnReader.schemaElement.getName())
- .pushContext("File: ", this.fileName).build(logger);
- throw ex;
+ return outputPageData;
}
- private DrillBuf decompress(PageHeader pageHeader, DrillBuf compressedData) {
- DrillBuf pageDataBuf = null;
+ /**
+ * Reads a compressed v2 data page which excluded the repetition and
definition level
+ * sections from compression.
+ * @return decompressed Parquet page data
+ * @throws IOException
+ */
+ protected DrillBuf decompressPageV2(ReadStatus readStatus) throws
IOException {
Stopwatch timer = Stopwatch.createUnstarted();
+
+ PageHeader pageHeader = readStatus.getPageHeader();
+ int inputSize = pageHeader.getCompressed_page_size();
+ int repLevelSize =
pageHeader.data_page_header_v2.getRepetition_levels_byte_length();
+ int defLevelSize =
pageHeader.data_page_header_v2.getDefinition_levels_byte_length();
+ int compDataOffset = repLevelSize + defLevelSize;
+ int outputSize = pageHeader.uncompressed_page_size;
+ // TODO: does reporting this number have the same meaning in an async
context?
+ long start = dataReader.getPos();
long timeToRead;
- int compressedSize = pageHeader.getCompressed_page_size();
- int uncompressedSize = pageHeader.getUncompressed_page_size();
- pageDataBuf = allocateTemporaryBuffer(uncompressedSize);
+
+ DrillBuf inputPageData = readStatus.getPageData();
+ DrillBuf outputPageData = this.allocator.buffer(outputSize);
+
try {
timer.start();
+ // Write out the uncompressed section
+ // Note that the following setBytes call to read the repetition and
definition level sections
+ // advances readerIndex in inputPageData but not writerIndex in
outputPageData.
+ outputPageData.setBytes(0, inputPageData, compDataOffset);
+
+ // decompress from the start of compressed data to the end of the input
buffer
+ CompressionCodecName codecName = columnChunkMetaData.getCodec();
+ CompressionCodecFactory.BytesInputDecompressor decomp =
codecFactory.getDecompressor(codecName);
+ ByteBuffer input = inputPageData.nioBuffer(compDataOffset, inputSize -
compDataOffset);
+ ByteBuffer output = outputPageData.nioBuffer(compDataOffset, outputSize
- compDataOffset);
+ decomp.decompress(
+ input,
+ inputSize - compDataOffset,
+ output,
+ outputSize - compDataOffset
+ );
+ outputPageData.writerIndex(outputSize);
+ timeToRead = timer.elapsed(TimeUnit.NANOSECONDS);
- CompressionCodecName codecName =
parentColumnReader.columnChunkMetaData.getCodec();
- BytesInputDecompressor decomp = codecFactory.getDecompressor(codecName);
- ByteBuffer input = compressedData.nioBuffer(0, compressedSize);
- ByteBuffer output = pageDataBuf.nioBuffer(0, uncompressedSize);
+ logger.trace(
+ "Col: {} readPos: {} Uncompressed_size: {} pageData: {}",
+ columnChunkMetaData.toString(),
+ dataReader.getPos(), // TODO: see comment on earlier call to getPos()
+ outputSize,
+ ByteBufUtil.hexDump(outputPageData)
+ );
- decomp.decompress(input, compressedSize, output, uncompressedSize);
- pageDataBuf.writerIndex(uncompressedSize);
- timeToRead = timer.elapsed(TimeUnit.NANOSECONDS);
- this.updateStats(pageHeader, "Decompress", 0, timeToRead,
compressedSize, uncompressedSize);
- } catch (IOException e) {
- handleAndThrowException(e, "Error decompressing data.");
+ this.updateStats(pageHeader, "Decompress", start, timeToRead, inputSize,
outputSize);
+ } finally {
+ readStatus.setPageData(null);
+ if (inputPageData != null) {
+ inputPageData.release();
+ }
}
- return pageDataBuf;
+
+ return outputPageData;
}
- @Override
- protected void nextInternal() throws IOException {
- ReadStatus readStatus = null;
+ private ReadStatus nextPageFromQueue() throws InterruptedException,
ExecutionException {
+ ReadStatus readStatus;
+ Stopwatch timer = Stopwatch.createStarted();
+
parentColumnReader.parentReader.getOperatorContext().getStats().startWait();
try {
- Stopwatch timer = Stopwatch.createStarted();
-
parentColumnReader.parentReader.getOperatorContext().getStats().startWait();
- try {
- waitForExecutionResult(); // get the result of execution
- synchronized (pageQueueSyncronize) {
- boolean pageQueueFull = pageQueue.remainingCapacity() == 0;
- readStatus = pageQueue.take(); // get the data if no exception has
been thrown
- if (readStatus.pageData == null || readStatus == ReadStatus.EMPTY) {
- throw new DrillRuntimeException("Unexpected end of data");
- }
- //if the queue was full before we took a page out, then there would
- // have been no new read tasks scheduled. In that case, schedule a
new read.
- if (!parentColumnReader.isShuttingDown && pageQueueFull) {
- asyncPageRead.offer(ExecutorServiceUtil.submit(threadPool, new
AsyncPageReaderTask(debugName, pageQueue)));
- }
+ waitForExecutionResult(); // get the result of execution
+ synchronized (pageQueueSyncronize) {
+ boolean pageQueueFull = pageQueue.remainingCapacity() == 0;
+ readStatus = pageQueue.take(); // get the data if no exception has
been thrown
+ if (readStatus.pageData == null || readStatus == ReadStatus.EMPTY) {
+ throw new DrillRuntimeException("Unexpected end of data");
+ }
+ //if the queue was full before we took a page out, then there would
+ // have been no new read tasks scheduled. In that case, schedule a new
read.
+ if (!parentColumnReader.isShuttingDown && pageQueueFull) {
+ asyncPageRead.offer(ExecutorServiceUtil.submit(threadPool, new
AsyncPageReaderTask(debugName, pageQueue)));
}
- } finally {
-
parentColumnReader.parentReader.getOperatorContext().getStats().stopWait();
- }
- long timeBlocked = timer.elapsed(TimeUnit.NANOSECONDS);
- stats.timeDiskScanWait.addAndGet(timeBlocked);
- stats.timeDiskScan.addAndGet(readStatus.getDiskScanTime());
- if (readStatus.isDictionaryPage) {
- stats.numDictPageLoads.incrementAndGet();
- stats.timeDictPageLoads.addAndGet(timeBlocked +
readStatus.getDiskScanTime());
- } else {
- stats.numDataPageLoads.incrementAndGet();
- stats.timeDataPageLoads.addAndGet(timeBlocked +
readStatus.getDiskScanTime());
}
- pageHeader = readStatus.getPageHeader();
+ } finally {
+
parentColumnReader.parentReader.getOperatorContext().getStats().stopWait();
+ }
- // TODO - figure out if we need multiple dictionary pages, I believe it
may be limited to one
- // I think we are clobbering parts of the dictionary if there can be
multiple pages of dictionary
-
- do {
- if (pageHeader.getType() == PageType.DICTIONARY_PAGE) {
- readDictionaryPageData(readStatus, parentColumnReader);
- waitForExecutionResult(); // get the result of execution
- synchronized (pageQueueSyncronize) {
- boolean pageQueueFull = pageQueue.remainingCapacity() == 0;
- readStatus = pageQueue.take(); // get the data if no exception has
been thrown
- if (readStatus.pageData == null || readStatus == ReadStatus.EMPTY)
{
- break;
- }
- //if the queue was full before we took a page out, then there would
- // have been no new read tasks scheduled. In that case, schedule a
new read.
- if (!parentColumnReader.isShuttingDown && pageQueueFull) {
- asyncPageRead.offer(ExecutorServiceUtil.submit(threadPool, new
AsyncPageReaderTask(debugName, pageQueue)));
- }
- }
- pageHeader = readStatus.getPageHeader();
- }
- } while (pageHeader.getType() == PageType.DICTIONARY_PAGE);
+ long timeBlocked = timer.elapsed(TimeUnit.NANOSECONDS);
+ stats.timeDiskScanWait.addAndGet(timeBlocked);
+ stats.timeDiskScan.addAndGet(readStatus.getDiskScanTime());
+ if (readStatus.isDictionaryPage) {
+ stats.numDictPageLoads.incrementAndGet();
+ stats.timeDictPageLoads.addAndGet(timeBlocked +
readStatus.getDiskScanTime());
+ } else {
+ stats.numDataPageLoads.incrementAndGet();
+ stats.timeDataPageLoads.addAndGet(timeBlocked +
readStatus.getDiskScanTime());
+ }
+
+ return readStatus;
+ }
+ @Override
+ protected void nextInternal() throws IOException {
+ try {
+ ReadStatus readStatus = nextPageFromQueue();
pageHeader = readStatus.getPageHeader();
- pageData = getDecompressedPageData(readStatus);
- assert (pageData != null);
+
+ if (pageHeader.getType() == PageType.DICTIONARY_PAGE) {
Review comment:
@vvysotskyi I thought the same thing in an earlier version, but I now
don't think it would work. The switch expression evaluates
`pageHeader.getType()`, and the DICTIONARY_PAGE case *modifies* pageHeader
because after loading the dictionary it loads another page. So if we fell
through from DICTIONARY_PAGE we'd need the switch expression to reevaluate
`pageHeader.getType()` and I don't think it will do that? I.e. I'd think
switches only evaluate their expression once...
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
> Move parquet to use v2 format as default
> ----------------------------------------
>
> Key: DRILL-1282
> URL: https://issues.apache.org/jira/browse/DRILL-1282
> Project: Apache Drill
> Issue Type: Improvement
> Components: Storage - Parquet
> Reporter: Jacques Nadeau
> Assignee: James Turton
> Priority: Minor
> Fix For: Future
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)