Hello everyone!

First of all, I hope I'm in the right place. The contribution guidelines directed me here after I discovered the registration in the Jira tracker is closed. I'm a Ph.D. student at RPTU Kaiserslautern-Landau, and my current research revolves around sorting-based improvements to RLE compression for Dremel-encoded data. In the process, I've discovered the following issue in Parquet:

Parquet-MR seems to choose the encoding based on the values on the first page. If all values on that first page are null, it falls back to plain encoding. This is problematic if the dataset is (initially) very sparse or has been sorted. Note that sort operations in Spark sort nulls first by default, so it is easy to trigger this issue without noticing it. Even then, correlations in the dataset may prevent a user from finding a sort order which does not trigger the bug. Also note that it can be triggered by sorting by a correlated attribute, e.g., an event log stored in chronological order with an attribute that has only been added after some time. Thus, this issue has an impact beyond academia.

To reproduce the issue, create two data frames, one of which starts with more nulls than fit on a page, and write them to disk as Parquet

valgood= Iterator.fill(25000)(5) ++ Iterator.fill(25000)(null) // nulls last
valbad= Iterator.fill(25000)(null) ++ Iterator.fill(25000)(5) // nulls first
// do spark stuff for creating DFs here, I have attached the complete Scala code
goodDf.write.parquet("good")
badDf.write.parquet("bad")

As can be seen, both Dataframes contain a long run of "5" and a long run of nulls. We would expect both to compress very well, but if we check the resulting files, we find that the "bad" file is an order of magnitude larger. If we inspect them with parquet-cli pages, the "good" file uses dictionary and RLE compression as expected:

Column: foo
--------------------------------------------------------------------------------
  page   type  enc   count   avg size   size       rows nulls   min / max
  0-D    dict  S _   1       4.00 B     4 B
  0-1    data  S R   20000   0.00 B     12 B
  0-2    data  S R   20000   0.00 B     14 B
  0-3    data  S R   10000   0.00 B     9 B
The "bad" file, on the other hand, does not have a dictionary and is not RLE compressed:

Column: foo
--------------------------------------------------------------------------------
  page   type  enc  count   avg size   size       rows nulls   min / max
  0-0    data  S _  20000   0.00 B     8 B
  0-1    data  S _  20000   3.00 B     58.604 kB
  0-2    data  S _  10000   4.00 B     39.070 kB

I am using Spark 3.2.0, which comes with Parquet 1.12.1. Unfortunately, I am not familiar enough with the Parquet codebase to submit a fix myself, but I'm happy to help if further questions arise.

Kind Regards

Patrick
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.spark.sql.Row

object BugDemo {

  def main(args: Array[String]) : Unit = {
    // Initialize Spark
    val path = "./bugDemo"
    implicit val spark = SparkSession.builder()
        .appName(s"Generate bug demo entries")
        .master("local")
        .getOrCreate()

    // Generate the data
    val count = 25000
    val good = Iterator.fill(count)(5) ++ Iterator.fill(count)(null) // nulls last
    val bad = Iterator.fill(count)(null) ++ Iterator.fill(count)(5) // nulls first

    // write 
    write(spark, good, s"$path/good")
    write(spark, bad, s"$path/bad")
  }

  def write(spark: SparkSession, data: Iterator[Any], path: String) = {
    val schema = StructType(Array(StructField("foo", IntegerType, true)))
    val rows = data.map(row => new GenericRowWithSchema(Array(row), schema): Row).toSeq
    val rdd = spark.sparkContext.parallelize(rows)
    spark.sqlContext.createDataFrame(rdd, schema).write.parquet(path)
  }
}

Reply via email to