Hey all,
Recently we have met a very weird issue, some jobs could face the error:

Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read
> value at 1369 in block 3 in file
> hdfs://test/testxx/part-00668-70b4a5c5-e016-4215-ad4e-1e954d135e48-c000.zstd.parquet
> at
> org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:254)
> at
> org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:207)
> at
> org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
> at
> org.apache.spark.sql.execution.datasources.RecordReaderIterator$$anon$1.hasNext(RecordReaderIterator.scala:61)
> at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:104)
> at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
> at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
> at
> org.apache.spark.ContextAwareIterator.hasNext(ContextAwareIterator.scala:39)
> at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
> at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
> at
> scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:1160)
> at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:1176)
> at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1214)
> at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1217)
> at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
> at scala.collection.Iterator.foreach(Iterator.scala:943)
> at scala.collection.Iterator.foreach$(Iterator.scala:943)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
> at
> org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:307)
> at
> org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.writeIteratorToStream(PythonUDFRunner.scala:53)
> at
> org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:434)
> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2089)
> at
> org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:269)
> Caused by: java.lang.ArrayIndexOutOfBoundsException: 254
> at
> org.apache.parquet.io.RecordReaderImplementation.read(RecordReaderImplementation.java:412)
> at
> org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:229)
> ... 22 more
>

After investigating this file, we discovered that, in `
*RunLengthBitPackingHybridDecoder#readNext*`, when Parquet decodes the next
repetition level/definition level, the header could unexpectedly return
zero. This leads to the mode being set as "RLE" and returning a false value.

> private void readNext() throws IOException {
>     Preconditions.checkArgument(in.available() > 0, "Reading past
> RLE/BitPacking stream.");
>
        // Here header returns 0, making mode be RLE, and then currentValue
is the wrong value.

>     final int header = BytesUtils.readUnsignedVarInt(in);
>     mode = (header & 1) == 0 ? MODE.RLE : MODE.PACKED;
>     switch (mode) {
>     case RLE:
>       currentCount = header >>> 1;
>       LOG.debug("reading {} values RLE", currentCount);
>       currentValue = BytesUtils.readIntLittleEndianPaddedOnBitWidth(in,
> bitWidth);
>       break;
>     case PACKED:
>       int numGroups = header >>> 1;
>       currentCount = numGroups * 8;
>       LOG.debug("reading {} values BIT PACKED", currentCount);
>       currentBuffer = new int[currentCount]; // TODO: reuse a buffer
>       byte[] bytes = new byte[numGroups * bitWidth];
>       // At the end of the file RLE data though, there might not be that
> many bytes left.
>       int bytesToRead = (int)Math.ceil(currentCount * bitWidth / 8.0);
>       bytesToRead = Math.min(bytesToRead, in.available());
>       new DataInputStream(in).readFully(bytes, 0, bytesToRead);
>       for (int valueIndex = 0, byteIndex = 0; valueIndex < currentCount;
> valueIndex += 8, byteIndex += bitWidth) {
>         packer.unpack8Values(bytes, byteIndex, currentBuffer, valueIndex);
>       }
>       break;
>     default:
>       throw new ParquetDecodingException("not a valid mode " + mode);
>     }
>   }
>

After comparing the correct file with the error file, we noticed that the
header should not be 0, and the mode should be PACKED. However, even after
conducting a thorough inspection, we are puzzled as to how it's possible
for bitPackedRun to return a zero header in
RunLengthBitPackingHybridEncoder. Therefore, we are seeking help from the
community.

Reply via email to