Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10593#discussion_r49822023
  
    --- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
 ---
    @@ -0,0 +1,271 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.datasources.parquet;
    +
    +import org.apache.parquet.Preconditions;
    +import org.apache.parquet.bytes.BytesUtils;
    +import org.apache.parquet.column.values.ValuesReader;
    +import org.apache.parquet.column.values.bitpacking.BytePacker;
    +import org.apache.parquet.column.values.bitpacking.Packer;
    +import org.apache.parquet.io.ParquetDecodingException;
    +import org.apache.spark.sql.execution.vectorized.ColumnVector;
    +
    +/**
    + * A values reader for Parquet's run-length encoded data. This is based 
off of the version in
    + * parquet-mr with these changes:
    + *  - Supports the vectorized interface.
    + *  - Works on byte arrays(byte[]) instead of making byte streams.
    + *
    + * This encoding is used in multiple places:
    + *  - Definition/Repetition levels
    + *  - Dictionary ids.
    + */
    +public final class VectorizedRleValuesReader extends ValuesReader {
    +  // Current decoding mode.
    +  private enum MODE {
    +    RLE,
    +    PACKED
    +  }
    +
    +  // Encoded data.
    +  private byte[] in;
    +  private int end;
    +  private int offset;
    +
    +  // bit/byte width of decoded data and utility to batch unpack them.
    +  private int bitWidth;
    +  private int bytesWidth;
    +  private BytePacker packer;
    +
    +  // Current decoding mode and values
    +  private MODE mode;
    +  private int currentCount;
    +  private int currentValue;
    +
    +  // Buffer of decoded values if the values are PACKED.
    +  private int[] currentBuffer = new int[16];
    +  private int currentBufferIdx = 0;
    +
    +  // If true, the bit width is fixed. This decoder is used in different 
places and this also
    +  // controls if we need to read the bitwidth from the beginning of the 
data stream.
    +  private final boolean fixedWidth;
    +
    +  public VectorizedRleValuesReader() {
    +    fixedWidth = false;
    +  }
    +
    +  public VectorizedRleValuesReader(int bitWidth) {
    +    fixedWidth = true;
    +    init(bitWidth);
    +  }
    +
    +  @Override
    +  public void initFromPage(int valueCount, byte[] page, int start) {
    +    this.offset = start;
    +    this.in = page;
    +    if (fixedWidth) {
    +      int length = readIntLittleEndian();
    +      this.end = this.offset + length;
    +    } else {
    +      this.end = page.length;
    +      if (this.end != this.offset) init(page[this.offset++] & 255);
    +    }
    +    this.currentCount = 0;
    +  }
    +
    +  /**
    +   * Initializes the internal state for decoding ints of `bitWidth`.
    +   */
    +  private void init(int bitWidth) {
    +    Preconditions.checkArgument(bitWidth >= 0 && bitWidth <= 32, "bitWidth 
must be >= 0 and <= 32");
    +    this.bitWidth = bitWidth;
    +    this.bytesWidth = BytesUtils.paddedByteCountFromBits(bitWidth);
    +    this.packer = Packer.LITTLE_ENDIAN.newBytePacker(bitWidth);
    +  }
    +
    +  @Override
    +  public int getNextOffset() {
    +    return this.end;
    +  }
    +
    +  @Override
    +  public boolean readBoolean() {
    +    return this.readInteger() != 0;
    +  }
    +
    +  @Override
    +  public void skip() {
    +    this.readInteger();
    +  }
    +
    +  @Override
    +  public int readValueDictionaryId() {
    +    return readInteger();
    +  }
    +
    +  @Override
    +  public int readInteger() {
    +    if (this.currentCount == 0) { this.readNextGroup(); }
    +
    +    --this.currentCount;
    --- End diff --
    
    not a big deal, but this is old c++ habit :)



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to