Hi Enrico, Using Spark version 3.1.3 and turning AQE off seems to fix the sorting. Looking into why, do you have thoughts?
Thanks, Swetha On Sat, Sep 17, 2022 at 1:58 PM Enrico Minack <i...@enrico.minack.dev> wrote: > Hi, > > from a quick glance over your transformations, sortCol should be sorted. > > Are you using Spark 3.2 or above? Can you try again with AQE turned off in > that case? > > > https://spark.apache.org/docs/latest/sql-performance-tuning.html#adaptive-query-execution > > Enrico > > > > Am 16.09.22 um 23:28 schrieb Swetha Baskaran: > > Hi Enrico, > > Thank you for your response! > Could you clarify what you mean by *values for "col1" will be "randomly" > allocated to partition files*? > > We observe one file per partition, however we see an alternating pattern > of unsorted rows in some files. > Here is the code used and the unsorted pattern observed in the output > files. > > > > > > > > > > *df .repartition(col("day"), col("month"), col("year")) > .withColumn("partitionId",spark_partition_id) > .withColumn("monotonicallyIncreasingIdUnsorted",monotonicallyIncreasingId) > .sortWithinPartitions("year", "month", "day", "sortCol") > .withColumn("monotonicallyIncreasingIdSorted",monotonicallyIncreasingId) > .write .partitionBy("year", "month", "day") .parquet(path)* > > 1 > +-------+-----------+---------------------------------+-------------------------------+ > |sortCol|partitionId|monotonicallyIncreasingIdUnsorted|monotonicallyIncreasingIdSorted| > +-------+-----------+---------------------------------+-------------------------------+ > | 100000| 732| 6287832121344| > 6287832121344| > |1170583| 732| 6287842137820| > 6287876860586| > | 100000| 732| 6287879216173| > 6287832121345| > |1170583| 732| 6287890351126| > 6287876860587| > | 100000| 732| 6287832569336| > 6287832121346| > |1170583| 732| 6287843957457| > 6287876860588| > | 100000| 732| 6287881576840| > 6287832121347| > |1170583| 732| 6287892533054| > 6287876860589| > | 100000| 732| 6287833244394| > 6287832121348| > |1170583| 732| 6287847669077| > 6287876860590| > | 100000| 732| 6287884414741| > 6287832121349| > |1170583| 732| 6287894723328| > 6287876860591| > | 100000| 732| 6287833768679| > 6287832121350| > |1170583| 732| 6287849212375| > 6287876860592| > | 100000| 732| 6287885330261| > 6287832121351| > |1170583| 732| 6287896605691| > 6287876860593| > | 100000| 732| 6287835089415| > 6287832121352| > |1170583| 732| 6287851414977| > 6287876860594| > | 100000| 732| 6287886356164| > 6287832121353| > |1170583| 732| 6287899702397| > 6287876860595| > +-------+-----------+---------------------------------+-------------------------------+ > > 2 > +-------+-----------+---------------------------------+-------------------------------+ > |sortCol|partitionId|monotonicallyIncreasingIdUnsorted|monotonicallyIncreasingIdSorted| > +-------+-----------+---------------------------------+-------------------------------+ > | 100000| 136| 1168231104512| > 1168231104512| > |1215330| 136| 1168267800695| > 1168275843754| > | 100000| 136| 1168365908174| > 1168231104513| > |1215330| 136| 1168272121474| > 1168275843755| > | 100000| 136| 1168233930111| > 1168231104514| > |1215330| 136| 1168275020862| > 1168275843756| > | 100000| 136| 1168369592448| > 1168231104515| > |1215331| 136| 1168320722989| > 1168275843757| > | 100000| 136| 1168235423908| > 1168231104516| > |1215331| 136| 1168232219843| > 1168275843758| > | 100000| 136| 1168276450874| > 1168231104517| > |1215331| 136| 1168330171556| > 1168275843759| > | 100000| 136| 1168239878974| > 1168231104518| > |1215331| 136| 1168235045442| > 1168275843760| > | 100000| 136| 1168287069249| > 1168231104519| > |1215331| 136| 1168331936649| > 1168275843761| > | 100000| 136| 1168246605999| > 1168231104520| > |1215331| 136| 1168236539239| > 1168275843762| > | 100000| 136| 1168289197499| > 1168231104521| > |1215331| 136| 1168337136110| > 1168275843763| > +-------+-----------+---------------------------------+-------------------------------+ > > 3 > +-------+-----------+---------------------------------+-------------------------------+ > |sortCol|partitionId|monotonicallyIncreasingIdUnsorted|monotonicallyIncreasingIdSorted| > +-------+-----------+---------------------------------+-------------------------------+ > | 100000| 581| 4990751997952| > 4990751997952| > |1207875| 581| 4990829438530| > 4990796737194| > | 100000| 581| 4990797772249| > 4990751997953| > |1207875| 581| 4990789773711| > 4990796737195| > | 100000| 581| 4990754836237| > 4990751997954| > |1207875| 581| 4990792883763| > 4990796737196| > | 100000| 581| 4990799663372| > 4990751997955| > |1207875| 581| 4990795135016| > 4990796737197| > | 100000| 581| 4990754889999| > 4990751997956| > |1207875| 581| 4990796258628| > 4990796737198| > | 100000| 581| 4990801912980| > 4990751997957| > |1207876| 581| 4990798880125| > 4990796737199| > | 100000| 581| 4990755328908| > 4990751997958| > |1207876| 581| 4990753105828| > 4990796737200| > | 100000| 581| 4990804520539| > 4990751997959| > |1207876| 581| 4990800771248| > 4990796737201| > | 100000| 581| 4990756046653| > 4990751997960| > |1207876| 581| 4990757154529| > 4990796737202| > | 100000| 581| 4990806212169| > 4990751997961| > |1207876| 581| 4990803020856| > 4990796737203| > +-------+-----------+---------------------------------+-------------------------------+ > > > > Thanks, > Swetha > > > > On Fri, Sep 16, 2022 at 1:45 AM Enrico Minack <i...@enrico.minack.dev> > wrote: > >> Yes, you can expect each partition file to be sorted by "col1" and "col2". >> >> However, values for "col1" will be "randomly" allocated to partition >> files, but all rows with the same value for "col1" will reside in the same >> one partition file. >> >> What kind of unexpected sort order do you observe? >> >> Enrico >> >> >> >> Am 16.09.22 um 05:42 schrieb Swetha Baskaran: >> >> Hi! >> >> We expected the order of sorted partitions to be preserved after a >> dataframe write. We use the following code to write out one file per >> partition, with the rows sorted by a column. >> >> >> >> >> >> >> *df .repartition($"col1") .sortWithinPartitions("col1", "col2") >> .write .partitionBy("col1") .csv(path)* >> >> However we observe unexpected sort order in some files. Does spark >> guarantee sort order within partitions on write? >> >> >> Thanks, >> swebask >> >> >> >