Github user tejasapatil commented on a diff in the pull request: https://github.com/apache/spark/pull/14864#discussion_r77118552 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala --- @@ -156,24 +156,56 @@ case class FileSourceScanExec( false } - override val outputPartitioning: Partitioning = { + override val (outputPartitioning, outputOrdering): (Partitioning, Seq[SortOrder]) = { val bucketSpec = if (relation.sparkSession.sessionState.conf.bucketingEnabled) { relation.bucketSpec } else { None } - bucketSpec.map { spec => - val numBuckets = spec.numBuckets - val bucketColumns = spec.bucketColumnNames.flatMap { n => - output.find(_.name == n) - } - if (bucketColumns.size == spec.bucketColumnNames.size) { - HashPartitioning(bucketColumns, numBuckets) - } else { - UnknownPartitioning(0) - } - }.getOrElse { - UnknownPartitioning(0) + bucketSpec match { + case Some(spec) => + val numBuckets = spec.numBuckets + val bucketColumns = spec.bucketColumnNames.flatMap { n => + output.find(_.name == n) + } + if (bucketColumns.size == spec.bucketColumnNames.size) { + val partitioning = HashPartitioning(bucketColumns, numBuckets) + + val sortOrder = if (spec.sortColumnNames.nonEmpty) { + // In case of bucketing, its possible to have multiple files belonging to the + // same bucket in a given relation. Each of these files are locally sorted + // but those files combined together are not globally sorted. Given that, + // the RDD partition will not be sorted even if the relation has sort columns set + // Current solution is to check if all the buckets have a single file in it + + val files = + relation.location.listFiles(partitionFilters).flatMap(partition => partition.files) + val bucketToFilesGrouping = + files.map(_.getPath.getName).groupBy(file => BucketingUtils.getBucketId(file)) + val singleFilePartitions = bucketToFilesGrouping.forall(p => p._2.length <= 1) --- End diff -- For the sorting problem, one way to fix would be to do what Hive does : create single file per bucket. For any other approach, since there would be multiple files per bucket, one would have to globally sort them while reading it. This would in a way be sub-optimal because tables tend to be "write-once, read many" and spending more CPU once for write path to generate single file would be better. When I came across this, I wondered why it was designed this way. I even posted about this to dev group earlier today : http://apache-spark-developers-list.1001551.n3.nabble.com/Questions-about-bucketing-in-Spark-td18814.html To give you some context, I am trying to drive adoption for Spark within Facebook. We have lot of tables which would benefit from having full bucketing support. So my high level goal is to get Spark's bucketing in par with Hive's in terms of features and compatibility.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org