Hey all,
Recently we have met a very weird issue, some jobs could face the error:
Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read
> value at 1369 in block 3 in file
> hdfs://test/testxx/part-00668-70b4a5c5-e016-4215-ad4e-1e954d135e48-c000.zstd.parquet
> at
> org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:254)
> at
> org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:207)
> at
> org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
> at
> org.apache.spark.sql.execution.datasources.RecordReaderIterator$$anon$1.hasNext(RecordReaderIterator.scala:61)
> at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:104)
> at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
> at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
> at
> org.apache.spark.ContextAwareIterator.hasNext(ContextAwareIterator.scala:39)
> at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
> at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
> at
> scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:1160)
> at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:1176)
> at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1214)
> at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1217)
> at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
> at scala.collection.Iterator.foreach(Iterator.scala:943)
> at scala.collection.Iterator.foreach$(Iterator.scala:943)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
> at
> org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:307)
> at
> org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.writeIteratorToStream(PythonUDFRunner.scala:53)
> at
> org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:434)
> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2089)
> at
> org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:269)
> Caused by: java.lang.ArrayIndexOutOfBoundsException: 254
> at
> org.apache.parquet.io.RecordReaderImplementation.read(RecordReaderImplementation.java:412)
> at
> org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:229)
> ... 22 more
>
After investigating this file, we discovered that, in `
*RunLengthBitPackingHybridDecoder#readNext*`, when Parquet decodes the next
repetition level/definition level, the header could unexpectedly return
zero. This leads to the mode being set as "RLE" and returning a false value.
> private void readNext() throws IOException {
> Preconditions.checkArgument(in.available() > 0, "Reading past
> RLE/BitPacking stream.");
>
// Here header returns 0, making mode be RLE, and then currentValue
is the wrong value.
> final int header = BytesUtils.readUnsignedVarInt(in);
> mode = (header & 1) == 0 ? MODE.RLE : MODE.PACKED;
> switch (mode) {
> case RLE:
> currentCount = header >>> 1;
> LOG.debug("reading {} values RLE", currentCount);
> currentValue = BytesUtils.readIntLittleEndianPaddedOnBitWidth(in,
> bitWidth);
> break;
> case PACKED:
> int numGroups = header >>> 1;
> currentCount = numGroups * 8;
> LOG.debug("reading {} values BIT PACKED", currentCount);
> currentBuffer = new int[currentCount]; // TODO: reuse a buffer
> byte[] bytes = new byte[numGroups * bitWidth];
> // At the end of the file RLE data though, there might not be that
> many bytes left.
> int bytesToRead = (int)Math.ceil(currentCount * bitWidth / 8.0);
> bytesToRead = Math.min(bytesToRead, in.available());
> new DataInputStream(in).readFully(bytes, 0, bytesToRead);
> for (int valueIndex = 0, byteIndex = 0; valueIndex < currentCount;
> valueIndex += 8, byteIndex += bitWidth) {
> packer.unpack8Values(bytes, byteIndex, currentBuffer, valueIndex);
> }
> break;
> default:
> throw new ParquetDecodingException("not a valid mode " + mode);
> }
> }
>
After comparing the correct file with the error file, we noticed that the
header should not be 0, and the mode should be PACKED. However, even after
conducting a thorough inspection, we are puzzled as to how it's possible
for bitPackedRun to return a zero header in
RunLengthBitPackingHybridEncoder. Therefore, we are seeking help from the
community.