[
https://issues.apache.org/jira/browse/PARQUET-980?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Herman van Hovell updated PARQUET-980:
--
Description:
Parquet MR 1.8.2 does not support reading row groups which are larger than 2
GB.
See:https://github.com/apache/parquet-mr/blob/parquet-1.8.x/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java#L1064
We are seeing this when writing skewed records. This throws off the estimation
of the memory check interval in the InternalParquetRecordWriter. The following
spark code illustrates this:
{noformat}
/**
* Create a data frame that will make parquet write a file with a row group
larger than 2 GB. Parquet
* only checks the size of the row group after writing a number of records.
This number is based on
* average row size of the already written records. This is problematic in the
following scenario:
* - The initial (100) records in the record group are relatively small.
* - The InternalParquetRecordWriter checks if it needs to write to disk (it
should not), it assumes
* that the remaining records have a similar size, and (greatly) increases
the check interval (usually
* to 1).
* - The remaining records are much larger then expected, making the row group
larger than 2 GB (which
* makes reading the row group impossible).
*
* The data frame below illustrates such a scenario. This creates a row group
of approximately 4GB.
*/
val badDf = spark.range(0, 2200, 1, 1).mapPartitions { iterator =>
var i = 0
val random = new scala.util.Random(42)
val buffer = new Array[Char](75)
iterator.map { id =>
// the first 200 records have a length of 1K and the remaining 2000 have a
length of 750K.
val numChars = if (i < 200) 1000 else 75
i += 1
// create a random array
var j = 0
while (j < numChars) {
// Generate a char (borrowed from scala.util.Random)
buffer(j) = (random.nextInt(0xD800 - 1) + 1).toChar
j += 1
}
// create a string: the string constructor will copy the buffer.
new String(buffer, 0, numChars)
}
}
badDf.write.parquet("somefile")
val corruptedDf = spark.read.parquet("somefile")
corruptedDf.select(count(lit(1)), max(length($"value"))).show()
{noformat}
The latter fails with the following exception:
{noformat}
java.lang.NegativeArraySizeException
at
org.apache.parquet.hadoop.ParquetFileReader$ConsecutiveChunkList.readAll(ParquetFileReader.java:1064)
at
org.apache.parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:698)
...
{noformat}
-This seems to be fixed by commit
https://github.com/apache/parquet-mr/commit/6b605a4ea05b66e1a6bf843353abcb4834a4ced8
in parquet 1.9.x. Is there any chance that we can fix this in 1.8.x?-
was:
Parquet MR 1.8.2 does not support reading row groups which are larger than 2
GB.
See:https://github.com/apache/parquet-mr/blob/parquet-1.8.x/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java#L1064
We are seeing this when writing skewed records. This throws off the estimation
of the memory check interval in the InternalParquetRecordWriter. The following
spark code illustrates this:
{noformat}
/**
* Create a data frame that will make parquet write a file with a row group
larger than 2 GB. Parquet
* only checks the size of the row group after writing a number of records.
This number is based on
* average row size of the already written records. This is problematic in the
following scenario:
* - The initial (100) records in the record group are relatively small.
* - The InternalParquetRecordWriter checks if it needs to write to disk (it
should not), it assumes
* that the remaining records have a similar size, and (greatly) increases
the check interval (usually
* to 1).
* - The remaining records are much larger then expected, making the row group
larger than 2 GB (which
* makes reading the row group impossible).
*
* The data frame below illustrates such a scenario. This creates a row group
of approximately 4GB.
*/
val badDf = spark.range(0, 2200, 1, 1).mapPartitions { iterator =>
var i = 0
val random = new scala.util.Random(42)
val buffer = new Array[Char](75)
iterator.map { id =>
// the first 200 records have a length of 1K and the remaining 2000 have a
length of 750K.
val numChars = if (i < 200) 1000 else 75
i += 1
// create a random array
var j = 0
while (j < numChars) {
// Generate a char (borrowed from scala.util.Random)
buffer(j) = (random.nextInt(0xD800 - 1) + 1).toChar
j += 1
}
// create a string: the string constructor will copy the buffer.
new String(buffer, 0, numChars)
}
}
badDf.write.parquet("somefile")
val corruptedDf = spark.read.parquet("somefile")
corruptedDf.select(count(lit(1)), max(length($"value"))).show()
{noformat}
The latter fails with the following