sachouche commented on a change in pull request #1330: DRILL-6147: Adding 
Columnar Parquet Batch Sizing functionality
URL: https://github.com/apache/drill/pull/1330#discussion_r198937973
 
 

 ##########
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java
 ##########
 @@ -90,22 +91,161 @@ public long readFields(long recordsToReadInThisPass) 
throws IOException {
       recordsReadInCurrentPass = readRecordsInBulk((int) 
recordsToReadInThisPass);
     }
 
+    // Publish this information
+    parentReader.readState.setValuesReadInCurrentPass((int) 
recordsReadInCurrentPass);
+
+    // Update the stats
     
parentReader.parquetReaderStats.timeVarColumnRead.addAndGet(timer.elapsed(TimeUnit.NANOSECONDS));
 
     return recordsReadInCurrentPass;
   }
 
   private int readRecordsInBulk(int recordsToReadInThisPass) throws 
IOException {
-    int recordsReadInCurrentPass = -1;
+    int batchNumRecords = recordsToReadInThisPass;
+    List<VarLenColumnBatchStats> columnStats = new 
ArrayList<VarLenColumnBatchStats>(columns.size());
+    int prevReadColumns = -1;
+    boolean overflowCondition = false;
+
+    for (VLColumnContainer columnContainer : orderedColumns) {
+      VarLengthColumn<?> columnReader = columnContainer.column;
+
+      // Read the column data
+      int readColumns = columnReader.readRecordsInBulk(batchNumRecords);
+      assert readColumns <= batchNumRecords : "Reader cannot return more 
values than requested..";
+
+      if (!overflowCondition) {
+        if (prevReadColumns >= 0 && prevReadColumns != readColumns) {
+          overflowCondition = true;
+        } else {
+          prevReadColumns = readColumns;
+        }
+      }
+
+      // Enqueue this column entry information to handle overflow conditions; 
we will not know
+      // whether an overflow happened till all variable length columns have 
been processed
+      columnStats.add(new VarLenColumnBatchStats(columnReader.valueVec, 
readColumns));
 
+      // Decrease the number of records to read when a column returns less 
records (minimize overflow)
+      if (batchNumRecords > readColumns) {
+        batchNumRecords = readColumns;
+        // it seems this column caused an overflow (higher layer will not ask 
for more values than remaining)
+        ++columnContainer.numCausedOverflows;
+      }
+    }
+
+    // Set the value-count for each column
     for (VarLengthColumn<?> columnReader : columns) {
-      int readColumns = 
columnReader.readRecordsInBulk(recordsToReadInThisPass);
-      assert (readColumns >= 0 && recordsReadInCurrentPass == readColumns || 
recordsReadInCurrentPass == -1);
+      columnReader.valuesReadInCurrentPass = batchNumRecords;
+    }
+
+    // Publish this batch statistics
+    publishBatchStats(columnStats, batchNumRecords);
 
-      recordsReadInCurrentPass = readColumns;
+    // Handle column(s) overflow if any
+    if (overflowCondition) {
+      handleColumnOverflow(columnStats, batchNumRecords);
     }
 
-    return recordsReadInCurrentPass;
+    return batchNumRecords;
+  }
+
+  private void handleColumnOverflow(List<VarLenColumnBatchStats> columnStats, 
int batchNumRecords) {
+    // Overflow would happen if a column returned more values than 
"batchValueCount"; this can happen
+    // when a column Ci is called first, returns num-values-i, and then 
another column cj is called which
+    // returns less values than num-values-i.
+    RecordBatchOverflow.Builder builder = null;
+
+    // We need to collect all columns which are subject to an overflow (except 
for the ones which are already
+    // returning values from previous batch overflow)
+    for (VarLenColumnBatchStats columnStat : columnStats) {
+      if (columnStat.numValuesRead > batchNumRecords) {
+        // We need to figure out whether this column was already returning 
values from a previous batch
+        // overflow; if it is, then this is a NOOP (as the overflow data is 
still available to be replayed)
+        if 
(fieldHasAlreadyOverflowData(columnStat.vector.getField().getName())) {
 
 Review comment:
   Sure!

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to