Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21070#discussion_r186603708
  
    --- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
 ---
    @@ -619,32 +608,37 @@ private int ceil8(int value) {
       /**
        * Reads the next group.
        */
    -  private void readNextGroup()  {
    -    int header = readUnsignedVarInt();
    -    this.mode = (header & 1) == 0 ? MODE.RLE : MODE.PACKED;
    -    switch (mode) {
    -      case RLE:
    -        this.currentCount = header >>> 1;
    -        this.currentValue = readIntLittleEndianPaddedOnBitWidth();
    -        return;
    -      case PACKED:
    -        int numGroups = header >>> 1;
    -        this.currentCount = numGroups * 8;
    -        int bytesToRead = ceil8(this.currentCount * this.bitWidth);
    -
    -        if (this.currentBuffer.length < this.currentCount) {
    -          this.currentBuffer = new int[this.currentCount];
    -        }
    -        currentBufferIdx = 0;
    -        int valueIndex = 0;
    -        for (int byteIndex = offset; valueIndex < this.currentCount; 
byteIndex += this.bitWidth) {
    -          this.packer.unpack8Values(in, byteIndex, this.currentBuffer, 
valueIndex);
    -          valueIndex += 8;
    -        }
    -        offset += bytesToRead;
    -        return;
    -      default:
    -        throw new ParquetDecodingException("not a valid mode " + 
this.mode);
    +  private void readNextGroup() {
    +    try {
    +      int header = readUnsignedVarInt();
    +      this.mode = (header & 1) == 0 ? MODE.RLE : MODE.PACKED;
    +      switch (mode) {
    +        case RLE:
    +          this.currentCount = header >>> 1;
    +          this.currentValue = readIntLittleEndianPaddedOnBitWidth();
    +          return;
    +        case PACKED:
    +          int numGroups = header >>> 1;
    +          this.currentCount = numGroups * 8;
    +
    +          if (this.currentBuffer.length < this.currentCount) {
    +            this.currentBuffer = new int[this.currentCount];
    +          }
    +          currentBufferIdx = 0;
    +          int valueIndex = 0;
    +          while (valueIndex < this.currentCount) {
    +            // values are bit packed 8 at a time, so reading bitWidth will 
always work
    +            ByteBuffer buffer = in.slice(bitWidth);
    +            this.packer.unpack8Values(
    +                buffer, buffer.arrayOffset() + buffer.position(), 
this.currentBuffer, valueIndex);
    --- End diff --
    
    shall we assume the `ByteBuffer` may not be on-heap?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to