Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19050#discussion_r137167260 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala --- @@ -875,4 +876,70 @@ class SubquerySuite extends QueryTest with SharedSQLContext { assert(e.message.contains("cannot resolve '`a`' given input columns: [t.i, t.j]")) } } + + test("SPARK-21835: Join in correlated subquery should be duplicateResolved: case 1") { + withTable("t1") { + withTempPath { path => + Seq(1 -> "a").toDF("i", "j").write.parquet(path.getCanonicalPath) + sql(s"CREATE TABLE t1 USING parquet LOCATION '${path.toURI}'") + + val sqlText = + """ + |SELECT * FROM t1 + |WHERE + |NOT EXISTS (SELECT * FROM t1) + """.stripMargin + val optimizedPlan = sql(sqlText).queryExecution.optimizedPlan + val join = optimizedPlan.collect { + case j: Join => j + }.head.asInstanceOf[Join] + assert(join.duplicateResolved) + assert(optimizedPlan.resolved) + } + } + } + + test("SPARK-21835: Join in correlated subquery should be duplicateResolved: case 2") { + withTable("t1", "t2", "t3") { + withTempPath { path => + val data = Seq((1, 1, 1), (2, 0, 2)) + + data.toDF("t1a", "t1b", "t1c").write.parquet(path.getCanonicalPath + "/t1") + data.toDF("t2a", "t2b", "t2c").write.parquet(path.getCanonicalPath + "/t2") + data.toDF("t3a", "t3b", "t3c").write.parquet(path.getCanonicalPath + "/t3") + + sql(s"CREATE TABLE t1 USING parquet LOCATION '${path.toURI}/t1'") + sql(s"CREATE TABLE t2 USING parquet LOCATION '${path.toURI}/t2'") + sql(s"CREATE TABLE t3 USING parquet LOCATION '${path.toURI}/t3'") + + val sqlText = + s""" + |SELECT * + |FROM (SELECT * + | FROM t2 + | WHERE t2c IN (SELECT t1c + | FROM t1 + | WHERE t1a = t2a) + | UNION + | SELECT * + | FROM t3 + | WHERE t3a IN (SELECT t2a + | FROM t2 + | UNION ALL + | SELECT t1a + | FROM t1 + | WHERE t1b > 0)) t4 + |WHERE t4.t2b IN (SELECT Min(t3b) + | FROM t3 + | WHERE t4.t2a = t3a) + """.stripMargin + val optimizedPlan = sql(sqlText).queryExecution.optimizedPlan + val joinNodes = optimizedPlan.collect { + case j: Join => j + }.map(_.asInstanceOf[Join]) + joinNodes.map(j => assert(j.duplicateResolved)) --- End diff -- ```Scala val joinNodes = optimizedPlan.collect { case j: Join => j } joinNodes.foreach(j => assert(j.duplicateResolved)) ```
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org