Repository: spark
Updated Branches:
  refs/heads/master 99f5f9886 -> 1bc41125e


[SPARK-11500][SQL] Not deterministic order of columns when using merging 
schemas.

https://issues.apache.org/jira/browse/SPARK-11500

As filed in SPARK-11500, if merging schemas is enabled, the order of files to 
touch is a matter which might affect the ordering of the output columns.

This was mostly because of the use of `Set` and `Map` so I replaced them to 
`LinkedHashSet` and `LinkedHashMap` to keep the insertion order.

Also, I changed `reduceOption` to `reduceLeftOption`, and replaced the order of 
`filesToTouch` from `metadataStatuses ++ commonMetadataStatuses ++ needMerged` 
to  `needMerged ++ metadataStatuses ++ commonMetadataStatuses` in order to 
touch the part-files first which always have the schema in footers whereas the 
others might not exist.

One nit is, If merging schemas is not enabled, but when multiple files are 
given, there is no guarantee of the output order, since there might not be a 
summary file for the first file, which ends up putting ahead the columns of the 
other files.

However, I thought this should be okay since disabling merging schemas means 
(assumes) all the files have the same schemas.

In addition, in the test code for this, I only checked the names of fields.

Author: hyukjinkwon <gurwls...@gmail.com>

Closes #9517 from HyukjinKwon/SPARK-11500.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1bc41125
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1bc41125
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1bc41125

Branch: refs/heads/master
Commit: 1bc41125ee6306e627be212969854f639969c440
Parents: 99f5f98
Author: hyukjinkwon <gurwls...@gmail.com>
Authored: Wed Nov 11 16:46:04 2015 +0800
Committer: Cheng Lian <l...@databricks.com>
Committed: Wed Nov 11 16:46:04 2015 +0800

----------------------------------------------------------------------
 .../datasources/parquet/ParquetRelation.scala   | 29 ++++++++++++++++----
 .../apache/spark/sql/sources/interfaces.scala   | 22 ++++++++-------
 .../sources/ParquetHadoopFsRelationSuite.scala  | 21 +++++++++++++-
 3 files changed, 55 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1bc41125/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
index 5a7c6b9..21337b2 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
@@ -383,7 +383,7 @@ private[sql] class ParquetRelation(
     var schema: StructType = _
 
     // Cached leaves
-    var cachedLeaves: Set[FileStatus] = null
+    var cachedLeaves: mutable.LinkedHashSet[FileStatus] = null
 
     /**
      * Refreshes `FileStatus`es, footers, partition spec, and table schema.
@@ -396,13 +396,13 @@ private[sql] class ParquetRelation(
         !cachedLeaves.equals(currentLeafStatuses)
 
       if (leafStatusesChanged) {
-        cachedLeaves = currentLeafStatuses.toIterator.toSet
+        cachedLeaves = currentLeafStatuses
 
         // Lists `FileStatus`es of all leaf nodes (files) under all base 
directories.
         val leaves = currentLeafStatuses.filter { f =>
           isSummaryFile(f.getPath) ||
             !(f.getPath.getName.startsWith("_") || 
f.getPath.getName.startsWith("."))
-        }.toArray
+        }.toArray.sortBy(_.getPath.toString)
 
         dataStatuses = leaves.filterNot(f => isSummaryFile(f.getPath))
         metadataStatuses =
@@ -465,13 +465,30 @@ private[sql] class ParquetRelation(
           // You should enable this configuration only if you are very sure 
that for the parquet
           // part-files to read there are corresponding summary files 
containing correct schema.
 
+          // As filed in SPARK-11500, the order of files to touch is a matter, 
which might affect
+          // the ordering of the output columns. There are several things to 
mention here.
+          //
+          //  1. If mergeRespectSummaries config is false, then it merges 
schemas by reducing from
+          //     the first part-file so that the columns of the 
lexicographically first file show
+          //     first.
+          //
+          //  2. If mergeRespectSummaries config is true, then there should 
be, at least,
+          //     "_metadata"s for all given files, so that we can ensure the 
columns of
+          //     the lexicographically first file show first.
+          //
+          //  3. If shouldMergeSchemas is false, but when multiple files are 
given, there is
+          //     no guarantee of the output order, since there might not be a 
summary file for the
+          //     lexicographically first file, which ends up putting ahead the 
columns of
+          //     the other files. However, this should be okay since not 
enabling
+          //     shouldMergeSchemas means (assumes) all the files have the 
same schemas.
+
           val needMerged: Seq[FileStatus] =
             if (mergeRespectSummaries) {
               Seq()
             } else {
               dataStatuses
             }
-          (metadataStatuses ++ commonMetadataStatuses ++ needMerged).toSeq
+          needMerged ++ metadataStatuses ++ commonMetadataStatuses
         } else {
           // Tries any "_common_metadata" first. Parquet files written by old 
versions or Parquet
           // don't have this.
@@ -768,10 +785,10 @@ private[sql] object ParquetRelation extends Logging {
 
           footers.map { footer =>
             ParquetRelation.readSchemaFromFooter(footer, converter)
-          }.reduceOption(_ merge _).iterator
+          }.reduceLeftOption(_ merge _).iterator
         }.collect()
 
-    partiallyMergedSchemas.reduceOption(_ merge _)
+    partiallyMergedSchemas.reduceLeftOption(_ merge _)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/1bc41125/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index e296d63..5b8841b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -428,11 +428,11 @@ abstract class HadoopFsRelation 
private[sql](maybePartitionSpec: Option[Partitio
   private var _partitionSpec: PartitionSpec = _
 
   private class FileStatusCache {
-    var leafFiles = mutable.Map.empty[Path, FileStatus]
+    var leafFiles = mutable.LinkedHashMap.empty[Path, FileStatus]
 
     var leafDirToChildrenFiles = mutable.Map.empty[Path, Array[FileStatus]]
 
-    private def listLeafFiles(paths: Array[String]): Set[FileStatus] = {
+    private def listLeafFiles(paths: Array[String]): 
mutable.LinkedHashSet[FileStatus] = {
       if (paths.length >= sqlContext.conf.parallelPartitionDiscoveryThreshold) 
{
         HadoopFsRelation.listLeafFilesInParallel(paths, hadoopConf, 
sqlContext.sparkContext)
       } else {
@@ -450,10 +450,11 @@ abstract class HadoopFsRelation 
private[sql](maybePartitionSpec: Option[Partitio
 
         val (dirs, files) = statuses.partition(_.isDir)
 
+        // It uses [[LinkedHashSet]] since the order of files can affect the 
results. (SPARK-11500)
         if (dirs.isEmpty) {
-          files.toSet
+          mutable.LinkedHashSet(files: _*)
         } else {
-          files.toSet ++ listLeafFiles(dirs.map(_.getPath.toString))
+          mutable.LinkedHashSet(files: _*) ++ 
listLeafFiles(dirs.map(_.getPath.toString))
         }
       }
     }
@@ -464,7 +465,7 @@ abstract class HadoopFsRelation 
private[sql](maybePartitionSpec: Option[Partitio
       leafFiles.clear()
       leafDirToChildrenFiles.clear()
 
-      leafFiles ++= files.map(f => f.getPath -> f).toMap
+      leafFiles ++= files.map(f => f.getPath -> f)
       leafDirToChildrenFiles ++= files.toArray.groupBy(_.getPath.getParent)
     }
   }
@@ -475,8 +476,8 @@ abstract class HadoopFsRelation 
private[sql](maybePartitionSpec: Option[Partitio
     cache
   }
 
-  protected def cachedLeafStatuses(): Set[FileStatus] = {
-    fileStatusCache.leafFiles.values.toSet
+  protected def cachedLeafStatuses(): mutable.LinkedHashSet[FileStatus] = {
+    mutable.LinkedHashSet(fileStatusCache.leafFiles.values.toArray: _*)
   }
 
   final private[sql] def partitionSpec: PartitionSpec = {
@@ -834,7 +835,7 @@ private[sql] object HadoopFsRelation extends Logging {
   def listLeafFilesInParallel(
       paths: Array[String],
       hadoopConf: Configuration,
-      sparkContext: SparkContext): Set[FileStatus] = {
+      sparkContext: SparkContext): mutable.LinkedHashSet[FileStatus] = {
     logInfo(s"Listing leaf files and directories in parallel under: 
${paths.mkString(", ")}")
 
     val serializableConfiguration = new SerializableConfiguration(hadoopConf)
@@ -854,9 +855,10 @@ private[sql] object HadoopFsRelation extends Logging {
         status.getAccessTime)
     }.collect()
 
-    fakeStatuses.map { f =>
+    val hadoopFakeStatuses = fakeStatuses.map { f =>
       new FileStatus(
         f.length, f.isDir, f.blockReplication, f.blockSize, 
f.modificationTime, new Path(f.path))
-    }.toSet
+    }
+    mutable.LinkedHashSet(hadoopFakeStatuses: _*)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/1bc41125/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala
index e2d754e..e866493 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala
@@ -23,7 +23,7 @@ import com.google.common.io.Files
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.sql.{execution, AnalysisException, SaveMode}
+import org.apache.spark.sql._
 import org.apache.spark.sql.types._
 
 
@@ -155,4 +155,23 @@ class ParquetHadoopFsRelationSuite extends 
HadoopFsRelationTest {
       assert(physicalPlan.collect { case p: execution.Filter => p }.length === 
1)
     }
   }
+
+  test("SPARK-11500: Not deterministic order of columns when using merging 
schemas.") {
+    import testImplicits._
+    withSQLConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true") {
+      withTempPath { dir =>
+        val pathOne = s"${dir.getCanonicalPath}/part=1"
+        Seq(1, 1).zipWithIndex.toDF("a", "b").write.parquet(pathOne)
+        val pathTwo = s"${dir.getCanonicalPath}/part=2"
+        Seq(1, 1).zipWithIndex.toDF("c", "b").write.parquet(pathTwo)
+        val pathThree = s"${dir.getCanonicalPath}/part=3"
+        Seq(1, 1).zipWithIndex.toDF("d", "b").write.parquet(pathThree)
+
+        // The schema consists of the leading columns of the first part-file
+        // in the lexicographic order.
+        assert(sqlContext.read.parquet(dir.getCanonicalPath).schema.map(_.name)
+          === Seq("a", "b", "c", "d", "part"))
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to