dongjoon-hyun commented on a change in pull request #32753:
URL: https://github.com/apache/spark/pull/32753#discussion_r659108572



##########
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetReadState.java
##########
@@ -33,31 +51,107 @@
   /** The remaining number of values to read in the current batch */
   int valuesToReadInBatch;
 
-  ParquetReadState(int maxDefinitionLevel) {
+  ParquetReadState(int maxDefinitionLevel, PrimitiveIterator.OfLong 
rowIndexes) {
     this.maxDefinitionLevel = maxDefinitionLevel;
+    this.rowRanges = rowIndexes == null ? null : constructRanges(rowIndexes);
+    nextRange();
   }
 
   /**
-   * Called at the beginning of reading a new batch.
+   * Construct a list of row ranges from the given `rowIndexes`. For example, 
suppose the
+   * `rowIndexes` are `[0, 1, 2, 4, 5, 7, 8, 9]`, it will be converted into 3 
row ranges:
+   * `[0-2], [4-5], [7-9]`.
    */
-  void resetForBatch(int batchSize) {
+  private Iterator<RowRange> constructRanges(PrimitiveIterator.OfLong 
rowIndexes) {
+    List<RowRange> rowRanges = new ArrayList<>();
+    long currentStart = Long.MIN_VALUE;
+    long previous = Long.MIN_VALUE;
+
+    while (rowIndexes.hasNext()) {
+      long idx = rowIndexes.nextLong();
+      if (previous == Long.MIN_VALUE) {
+        currentStart = previous = idx;
+      } else if (previous + 1 != idx) {
+        RowRange range = new RowRange(currentStart, previous);
+        rowRanges.add(range);
+        currentStart = previous = idx;
+      } else {
+        previous = idx;
+      }
+    }
+
+    if (previous != Long.MIN_VALUE) {
+      rowRanges.add(new RowRange(currentStart, previous));
+    }
+
+    return rowRanges.iterator();
+  }
+
+  /**
+   * Must be called at the beginning of reading a new batch.
+   */
+  void resetForNewBatch(int batchSize) {
     this.offset = 0;
     this.valuesToReadInBatch = batchSize;
   }
 
   /**
-   * Called at the beginning of reading a new page.
+   * Must be called at the beginning of reading a new page.
    */
-  void resetForPage(int totalValuesInPage) {
+  void resetForNewPage(int totalValuesInPage, long pageFirstRowIndex) {
     this.valuesToReadInPage = totalValuesInPage;
+    this.rowId = pageFirstRowIndex;
   }
 
   /**
-   * Advance the current offset to the new values.
+   * Returns the start index of the current row range.
    */
-  void advanceOffset(int newOffset) {
+  long currentRangeStart() {
+    return currentRange.start;
+  }
+
+  /**
+   * Returns the end index of the current row range.
+   */
+  long currentRangeEnd() {
+    return currentRange.end;
+  }
+
+  /**
+   * Advance the current offset and rowId to the new values.
+   */
+  void advanceOffsetAndRowId(int newOffset, long newRowId) {
     valuesToReadInBatch -= (newOffset - offset);
-    valuesToReadInPage -= (newOffset - offset);
+    valuesToReadInPage -= (newRowId - rowId);
     offset = newOffset;
+    rowId = newRowId;
+  }
+
+  /**
+   * Advance to the next range.
+   */
+  void nextRange() {
+    if (rowRanges == null) {
+      currentRange = MAX_ROW_RANGE;
+    } else {
+      if (!rowRanges.hasNext()) {
+        currentRange = MIN_ROW_RANGE;
+      } else {
+        currentRange = rowRanges.next();
+      }
+    }

Review comment:
       Shall we flatten more?
   ```java
   if (rowRanges == null) {
     currentRange = MAX_ROW_RANGE;
   } else if (!rowRanges.hasNext()) {
     currentRange = MIN_ROW_RANGE;
   } else {
     currentRange = rowRanges.next();
   }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to