Hi, We have a cluster running Apache Spark 2.0 on Hadoop 2.7.2, Centos 7.2. We had written some new code using the Spark DataFrame/DataSet APIs but are noticing incorrect results on a join after writing and then reading data to Windows Azure Storage Blobs (The default HDFS location). I've been able to duplicate the issue with the following snippet of code running on the cluster.
case class UserDimensions(user: Long, dimension: Long, score: Double) case class CentroidClusterScore(dimension: Long, cluster: Int, score: Double) val dims = sc.parallelize(Array(UserDimensions(12345, 0, 1.0))).toDS val cent = sc.parallelize(Array(CentroidClusterScore(0, 1, 1.0),CentroidClusterScore(1, 0, 1.0),CentroidClusterScore(2, 2, 1.0))).toDS dims.show cent.show dims.join(cent, dims("dimension") === cent("dimension") ).show outputs +-----+---------+-----+ | user|dimension|score| +-----+---------+-----+ |12345| 0| 1.0| +-----+---------+-----+ +---------+-------+-----+ |dimension|cluster|score| +---------+-------+-----+ | 0| 1| 1.0| | 1| 0| 1.0| | 2| 2| 1.0| +---------+-------+-----+ +-----+---------+-----+---------+-------+-----+ | user|dimension|score|dimension|cluster|score| +-----+---------+-----+---------+-------+-----+ |12345| 0| 1.0| 0| 1| 1.0| +-----+---------+-----+---------+-------+-----+ which is correct. However after writing and reading the data, we see this dims.write.mode("overwrite").save("/tmp/dims2.parquet") cent.write.mode("overwrite").save("/tmp/cent2.parquet") val dims2 = spark.read.load("/tmp/dims2.parquet").as[UserDimensions] val cent2 = spark.read.load("/tmp/cent2.parquet").as[CentroidClusterScore] dims2.show cent2.show dims2.join(cent2, dims2("dimension") === cent2("dimension") ).show outputs +-----+---------+-----+ | user|dimension|score| +-----+---------+-----+ |12345| 0| 1.0| +-----+---------+-----+ +---------+-------+-----+ |dimension|cluster|score| +---------+-------+-----+ | 0| 1| 1.0| | 1| 0| 1.0| | 2| 2| 1.0| +---------+-------+-----+ +-----+---------+-----+---------+-------+-----+ | user|dimension|score|dimension|cluster|score| +-----+---------+-----+---------+-------+-----+ |12345| 0| 1.0| null| null| null| +-----+---------+-----+---------+-------+-----+ However, using the RDD API produces the correct result dims2.rdd.map( row => (row.dimension, row) ).join( cent2.rdd.map( row => (row.dimension, row) ) ).take(5) res5: Array[(Long, (UserDimensions, CentroidClusterScore))] = Array((0,(UserDimensions(12345,0,1.0),CentroidClusterScore(0,1,1.0)))) We've tried changing the output format to ORC instead of parquet, but we see the same results. Running Spark 2.0 locally, not on a cluster, does not have this issue. Also running spark in local mode on the master node of the Hadoop cluster also works. Only when running on top of YARN do we see this issue. This also seems very similar to this issue: https://issues.apache.org/jira/browse/SPARK-10896 Thoughts? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/DataFrame-Dataset-join-not-producing-correct-results-in-Spark-2-0-Yarn-tp27888.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org