Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19050#discussion_r137167260
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala 
---
    @@ -875,4 +876,70 @@ class SubquerySuite extends QueryTest with 
SharedSQLContext {
           assert(e.message.contains("cannot resolve '`a`' given input columns: 
[t.i, t.j]"))
         }
       }
    +
    +  test("SPARK-21835: Join in correlated subquery should be 
duplicateResolved: case 1") {
    +    withTable("t1") {
    +      withTempPath { path =>
    +        Seq(1 -> "a").toDF("i", "j").write.parquet(path.getCanonicalPath)
    +        sql(s"CREATE TABLE t1 USING parquet LOCATION '${path.toURI}'")
    +
    +        val sqlText =
    +          """
    +            |SELECT * FROM t1
    +            |WHERE
    +            |NOT EXISTS (SELECT * FROM t1)
    +          """.stripMargin
    +        val optimizedPlan = sql(sqlText).queryExecution.optimizedPlan
    +        val join = optimizedPlan.collect {
    +          case j: Join => j
    +        }.head.asInstanceOf[Join]
    +        assert(join.duplicateResolved)
    +        assert(optimizedPlan.resolved)
    +      }
    +    }
    +  }
    +
    +  test("SPARK-21835: Join in correlated subquery should be 
duplicateResolved: case 2") {
    +    withTable("t1", "t2", "t3") {
    +      withTempPath { path =>
    +        val data = Seq((1, 1, 1), (2, 0, 2))
    +
    +        data.toDF("t1a", "t1b", "t1c").write.parquet(path.getCanonicalPath 
+ "/t1")
    +        data.toDF("t2a", "t2b", "t2c").write.parquet(path.getCanonicalPath 
+ "/t2")
    +        data.toDF("t3a", "t3b", "t3c").write.parquet(path.getCanonicalPath 
+ "/t3")
    +
    +        sql(s"CREATE TABLE t1 USING parquet LOCATION '${path.toURI}/t1'")
    +        sql(s"CREATE TABLE t2 USING parquet LOCATION '${path.toURI}/t2'")
    +        sql(s"CREATE TABLE t3 USING parquet LOCATION '${path.toURI}/t3'")
    +
    +        val sqlText =
    +          s"""
    +             |SELECT *
    +             |FROM   (SELECT *
    +             |        FROM   t2
    +             |        WHERE  t2c IN (SELECT t1c
    +             |                       FROM   t1
    +             |                       WHERE  t1a = t2a)
    +             |        UNION
    +             |        SELECT *
    +             |        FROM   t3
    +             |        WHERE  t3a IN (SELECT t2a
    +             |                       FROM   t2
    +             |                       UNION ALL
    +             |                       SELECT t1a
    +             |                       FROM   t1
    +             |                       WHERE  t1b > 0)) t4
    +             |WHERE  t4.t2b IN (SELECT Min(t3b)
    +             |                          FROM   t3
    +             |                          WHERE  t4.t2a = t3a)
    +           """.stripMargin
    +        val optimizedPlan = sql(sqlText).queryExecution.optimizedPlan
    +        val joinNodes = optimizedPlan.collect {
    +          case j: Join => j
    +        }.map(_.asInstanceOf[Join])
    +        joinNodes.map(j => assert(j.duplicateResolved))
    --- End diff --
    
    ```Scala
            val joinNodes = optimizedPlan.collect { case j: Join => j }
            joinNodes.foreach(j => assert(j.duplicateResolved))
    ```


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to