Hello everyone!
First of all, I hope I'm in the right place. The contribution guidelines
directed me here after I discovered the registration in the Jira tracker
is closed. I'm a Ph.D. student at RPTU Kaiserslautern-Landau, and my
current research revolves around sorting-based improvements to RLE
compression for Dremel-encoded data. In the process, I've discovered the
following issue in Parquet:
Parquet-MR seems to choose the encoding based on the values on the first
page. If all values on that first page are null, it falls back to plain
encoding. This is problematic if the dataset is (initially) very sparse
or has been sorted. Note that sort operations in Spark sort nulls first
by default, so it is easy to trigger this issue without noticing it.
Even then, correlations in the dataset may prevent a user from finding a
sort order which does not trigger the bug. Also note that it can be
triggered by sorting by a correlated attribute, e.g., an event log
stored in chronological order with an attribute that has only been added
after some time. Thus, this issue has an impact beyond academia.
To reproduce the issue, create two data frames, one of which starts with
more nulls than fit on a page, and write them to disk as Parquet
valgood= Iterator.fill(25000)(5) ++ Iterator.fill(25000)(null) // nulls last
valbad= Iterator.fill(25000)(null) ++ Iterator.fill(25000)(5) // nulls first
// do spark stuff for creating DFs here, I have attached the complete
Scala code
goodDf.write.parquet("good")
badDf.write.parquet("bad")
As can be seen, both Dataframes contain a long run of "5" and a long run
of nulls. We would expect both to compress very well, but if we check
the resulting files, we find that the "bad" file is an order of
magnitude larger. If we inspect them with parquet-cli pages, the "good"
file uses dictionary and RLE compression as expected:
Column: foo
--------------------------------------------------------------------------------
page type enc count avg size size rows nulls min / max
0-D dict S _ 1 4.00 B 4 B
0-1 data S R 20000 0.00 B 12 B
0-2 data S R 20000 0.00 B 14 B
0-3 data S R 10000 0.00 B 9 B
The "bad" file, on the other hand, does not have a dictionary and is not
RLE compressed:
Column: foo
--------------------------------------------------------------------------------
page type enc count avg size size rows nulls min / max
0-0 data S _ 20000 0.00 B 8 B
0-1 data S _ 20000 3.00 B 58.604 kB
0-2 data S _ 10000 4.00 B 39.070 kB
I am using Spark 3.2.0, which comes with Parquet 1.12.1. Unfortunately,
I am not familiar enough with the Parquet codebase to submit a fix
myself, but I'm happy to help if further questions arise.
Kind Regards
Patrick
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.spark.sql.Row
object BugDemo {
def main(args: Array[String]) : Unit = {
// Initialize Spark
val path = "./bugDemo"
implicit val spark = SparkSession.builder()
.appName(s"Generate bug demo entries")
.master("local")
.getOrCreate()
// Generate the data
val count = 25000
val good = Iterator.fill(count)(5) ++ Iterator.fill(count)(null) // nulls last
val bad = Iterator.fill(count)(null) ++ Iterator.fill(count)(5) // nulls first
// write
write(spark, good, s"$path/good")
write(spark, bad, s"$path/bad")
}
def write(spark: SparkSession, data: Iterator[Any], path: String) = {
val schema = StructType(Array(StructField("foo", IntegerType, true)))
val rows = data.map(row => new GenericRowWithSchema(Array(row), schema): Row).toSeq
val rdd = spark.sparkContext.parallelize(rows)
spark.sqlContext.createDataFrame(rdd, schema).write.parquet(path)
}
}