Hi,

It seems that something changed between Spark 1.6.2 and 2.0.0 that I wasn't 
expecting.

If I have a DataFrame with records sorted within each partition, and I write it 
to parquet and read back from the parquet, previously the records would be 
iterated through in the same order they were written (assuming no shuffle has 
taken place).  But this doesn't seem to be the case anymore.  Below is the code 
to reproduce in a spark-shell.

Was this change expected?

Thanks,
Jason.


import org.apache.spark.sql._
def isSorted[T](self: DataFrame, mapping: Row => T)(implicit ordering: 
Ordering[T]) = {
  import self.sqlContext.implicits._
  import ordering._
  self
    .mapPartitions(rows => {
      val isSorted = rows
        .map(mapping)
        .sliding(2) // all adjacent pairs
        .forall {
          case x :: y :: Nil => x <= y
          case x :: Nil => true
          case Nil => true
        }

      Iterator(isSorted)
    })
    .reduce(_ && _)
}

// in Spark 2.0.0
spark.range(100000).toDF("id").registerTempTable("input")
spark.sql("SELECT id FROM input DISTRIBUTE BY id SORT BY 
id").write.mode("overwrite").parquet("input.parquet")
isSorted(spark.read.parquet("input.parquet"), _.getAs[Long]("id"))
// FALSE

// in Spark 1.6.2
sqlContext.range(100000).toDF("id").registerTempTable("input")
sqlContext.sql("SELECT id FROM input DISTRIBUTE BY id SORT BY 
id").write.mode("overwrite").parquet("input.parquet")
isSorted(sqlContext.read.parquet("input.parquet"), _.getAs[Long]("id"))
// TRUE

Reply via email to