[ 
https://issues.apache.org/jira/browse/SPARK-12052?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon updated SPARK-12052:
---------------------------------
    Labels: bulk-closed  (was: )

> DataFrame with self-join fails unless toDF() column aliases provided
> --------------------------------------------------------------------
>
>                 Key: SPARK-12052
>                 URL: https://issues.apache.org/jira/browse/SPARK-12052
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.5.1, 1.5.2, 1.6.0
>         Environment: spark-shell for Spark 1.5.1, 1.5.2, and 1.6.0-Preview2
>            Reporter: Dean Wampler
>            Priority: Major
>              Labels: bulk-closed
>
> Joining with the same DF twice appears to match on the wrong column unless 
> the columns in the results of the first join are aliased with "toDF". Here is 
> an example program:
> {code}
> val rdd = sc.parallelize(2 to 100, 1).cache
> val numbers = rdd.map(i => (i, i*i)).toDF("n", "nsq")
> val names   = rdd.map(i => (i, i.toString)).toDF("id", "name")
> numbers.show
> names.show
> val good = numbers.
>   join(names, numbers("n") === names("id")).toDF("n", "nsq", "id1", "name1").
>   join(names, $"nsq" === names("id")).toDF("n", "nsq", "id1", "name1", "id2", 
> "name2")
> // The last toDF can be omitted and you'll still get valid results.
> good.printSchema
> // root
> //  |-- i: integer (nullable = false)
> //  |-- isq: integer (nullable = false)
> //  |-- i1: integer (nullable = false)
> //  |-- name1: string (nullable = true)
> //  |-- i2: integer (nullable = false)
> //  |-- name2: string (nullable = true)
> good.count
> // res3: Long = 9
> good.show
> // +---+---+---+-----+---+-----+
> // |  n|nsq|id1|name1|id2|name2|
> // +---+---+---+-----+---+-----+
> // |  2|  4|  2|    2|  4|    4|
> // |  4| 16|  4|    4| 16|   16|
> // |  6| 36|  6|    6| 36|   36|
> // |  8| 64|  8|    8| 64|   64|
> // | 10|100| 10|   10|100|  100|
> // |  3|  9|  3|    3|  9|    9|
> // |  5| 25|  5|    5| 25|   25|
> // |  7| 49|  7|    7| 49|   49|
> // |  9| 81|  9|    9| 81|   81|
> // +---+---+---+-----+---+-----+
> val bad = numbers.
>   join(names, numbers("n") === names("id")).
>   join(names, $"nsq" === names("id"))
> bad.printSchema
> // root
> //  |-- n: integer (nullable = false)
> //  |-- nsq: integer (nullable = false)
> //  |-- id: integer (nullable = false)
> //  |-- name: string (nullable = true)
> //  |-- id: integer (nullable = false)
> //  |-- name: string (nullable = true)
> bad.count
> // res6: Long = 0
> bad.show
> // +---+---+---+----+---+----+
> // |  n|nsq| id|name| id|name|
> // +---+---+---+----+---+----+
> // +---+---+---+----+---+----+
> // Curiosly, if you change the original rdd line to this:
> //   val rdd = sc.parallelize(2 to 100, 1).cache
> // The first record is for numbers is (1,1). Then bad will have the following
> // content:
> // +---+---+---+----+---+----+
> // |  n|nsq| id|name| id|name|
> // +---+---+---+----+---+----+
> // |  1|  1|  1|   1|  1|   1|
> // |  1|  1|  1|   1|  2|   2|
> // |  1|  1|  1|   1|  3|   3|
> // |  1|  1|  1|   1|  4|   4|
> // |  1|  1|  1|   1|  5|   5|
> // |  1|  1|  1|   1|  6|   6|
> // |  1|  1|  1|   1|  7|   7|
> // |  1|  1|  1|   1|  8|   8|
> // |  1|  1|  1|   1|  9|   9|
> // |  1|  1|  1|   1| 10|  10|
> // |  1|  1|  1|   1| 11|  11|
> // |  1|  1|  1|   1| 12|  12|
> // |  1|  1|  1|   1| 13|  13|
> // |  1|  1|  1|   1| 14|  14|
> // |  1|  1|  1|   1| 15|  15|
> // |  1|  1|  1|   1| 16|  16|
> // |  1|  1|  1|   1| 17|  17|
> // |  1|  1|  1|   1| 18|  18|
> // |  1|  1|  1|   1| 19|  19|
> // |  1|  1|  1|   1| 20|  20|
> // ...
> // |  1|  1|  1|   1| 96|  96|
> // |  1|  1|  1|   1| 97|  97|
> // |  1|  1|  1|   1| 98|  98|
> // |  1|  1|  1|   1| 99|  99|
> // |  1|  1|  1|   1|100| 100|
> // +---+---+---+----+---+----+
> //
> // This make no sense to me.
> // Breaking it up, so we can reference 'bad2("nsq")' doesn't help:
> val bad2 = numbers.
>   join(names, numbers("n") === names("id"))
> val bad3 = bad2.
>   join(names, bad2("nsq") === names("id"))
> bad3.printSchema
> bad3.count
> bad3.show
> {code}
> Note the embedded comment that if you start with 1 to 100, you get a record 
> in {{numbers}} with two {{1}} values. This yields the strange results shown 
> in the comment, suggesting that the join was actually done on the wrong 
> column of the first result set. However, the output actually makes no sense; 
> based on the results you get from the first join alone, it's "impossible" to 
> get this output!
> Note: Could be related to the following issues:
> * https://issues.apache.org/jira/browse/SPARK-10838 (I observed this behavior 
> while experimenting to examine this bug).
> * https://issues.apache.org/jira/browse/SPARK-11072
> * https://issues.apache.org/jira/browse/SPARK-10925



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to