[ 
https://issues.apache.org/jira/browse/CARBONDATA-1076?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

chenerlu updated CARBONDATA-1076:
---------------------------------
    Description: 
We can reproduce this issue as following steps:

Step1: create a carbon table
 
carbon.sql("CREATE TABLE IF NOT EXISTS carbon_table (col1 int, col2 int, col3 
int) STORED by 'carbondata' 
TBLPROPERTIES('DICTIONARY_INCLUDE'='col1,col2,col3','TABLE_BLOCKSIZE'='4')")
 
Step2: load data
carbon.sql("LOAD DATA LOCAL INPATH '/opt/carbon_table' INTO TABLE carbon_table")

data in file carbon_table as follows:
col1,col2,col3
1,2,3
4,5,6
7,8,9
 
you can get carbon_table file in attachment.
 
Step3: do the query
carbon.sql("SELECT c1.col1,c2.col1,c2.col3 FROM (SELECT col1,col2 FROM 
carbon_table GROUP BY col1,col2) c1 FULL JOIN (SELECT col1,count(col2) as col3 
FROM carbon_table GROUP BY col1) c2 ON c1.col1 = c2.col1").show()

[expected] Hive table and parquet table get same result as below and it should 
be correct.

|col1|col1|col3|
|   1|null|null|
|null|   4|   1|
|   4|null|null|
|null|   7|   1|
|   7|null|null|
|null|   1|   1|
 
[acutally] carbon will get null because wrong match.

|col1|col1|col3|
|   1|   1|   1|
|   4|   4|   1|
|   7|   7|   1|

Root cause analysis:
 
It is because this query has two subquery, and one subquey do the decode after 
exchange and the other subquery do the decode before exchange, and this may 
lead to wrong match when execute full join.
 
My idea: Can we move decode before exchange ? Because I am not very familiar 
with Carbon query, so any idea about this ?

Plan as follows:
 
== Physical Plan ==
SortMergeJoin [col1#3445], [col1#3460], FullOuter
:- Sort [col1#3445 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(col1#3445, 200)
:     +- CarbonDictionaryDecoder [CarbonDecoderRelation(Map(col1#3445 -> 
col1#3445, col2#3446 -> col2#3446, col3#3447 -> 
col3#3447),CarbonDatasourceHadoopRelation [ Database name :tempdev, Table name 
:carbon_table, Schema :Some(StructType(StructField(col1,IntegerType,true), 
StructField(col2,IntegerType,true), StructField(col3,IntegerType,true))) ]), 
CarbonDecoderRelation(Map(col1#3460 -> col1#3460, col2#3461 -> col2#3461, 
col3#3462 -> col3#3462),CarbonDatasourceHadoopRelation [ Database name 
:tempdev, Table name :carbon_table, Schema 
:Some(StructType(StructField(col1,IntegerType,true), 
StructField(col2,IntegerType,true), StructField(col3,IntegerType,true))) ])], 
IncludeProfile(ArrayBuffer(col1#3445)), CarbonAliasDecoderRelation(), 
org.apache.spark.sql.CarbonSession@69e87cbe
:        +- HashAggregate(keys=[col1#3445, col2#3446], functions=[], 
output=[col1#3445])
:           +- Exchange hashpartitioning(col1#3445, col2#3446, 200)
:              +- HashAggregate(keys=[col1#3445, col2#3446], functions=[], 
output=[col1#3445, col2#3446])
:                 +- Scan CarbonDatasourceHadoopRelation [ Database name 
:tempdev, Table name :carbon_table, Schema 
:Some(StructType(StructField(col1,IntegerType,true), 
StructField(col2,IntegerType,true), StructField(col3,IntegerType,true))) ] 
tempdev.carbon_table[col1#3445,col2#3446] 
+- Sort [col1#3460 ASC NULLS FIRST], false, 0
   +- CarbonDictionaryDecoder [CarbonDecoderRelation(Map(col1#3445 -> 
col1#3445, col2#3446 -> col2#3446, col3#3447 -> 
col3#3447),CarbonDatasourceHadoopRelation [ Database name :tempdev, Table name 
:carbon_table, Schema :Some(StructType(StructField(col1,IntegerType,true), 
StructField(col2,IntegerType,true), StructField(col3,IntegerType,true))) ]), 
CarbonDecoderRelation(Map(col1#3460 -> col1#3460, col2#3461 -> col2#3461, 
col3#3462 -> col3#3462),CarbonDatasourceHadoopRelation [ Database name 
:tempdev, Table name :carbon_table, Schema 
:Some(StructType(StructField(col1,IntegerType,true), 
StructField(col2,IntegerType,true), StructField(col3,IntegerType,true))) ])], 
IncludeProfile(ArrayBuffer(col1#3460)), CarbonAliasDecoderRelation(), 
org.apache.spark.sql.CarbonSession@69e87cbe
      +- HashAggregate(keys=[col1#3460], functions=[count(col2#3461)], 
output=[col1#3460, col3#3436L])
         +- Exchange hashpartitioning(col1#3460, 200)
            +- HashAggregate(keys=[col1#3460], 
functions=[partial_count(col2#3461)], output=[col1#3460, count#3472L])
               +- CarbonDictionaryDecoder [CarbonDecoderRelation(Map(col1#3445 
-> col1#3445, col2#3446 -> col2#3446, col3#3447 -> 
col3#3447),CarbonDatasourceHadoopRelation [ Database name :tempdev, Table name 
:carbon_table, Schema :Some(StructType(StructField(col1,IntegerType,true), 
StructField(col2,IntegerType,true), StructField(col3,IntegerType,true))) ]), 
CarbonDecoderRelation(Map(col1#3460 -> col1#3460, col2#3461 -> col2#3461, 
col3#3462 -> col3#3462),CarbonDatasourceHadoopRelation [ Database name 
:tempdev, Table name :carbon_table, Schema 
:Some(StructType(StructField(col1,IntegerType,true), 
StructField(col2,IntegerType,true), StructField(col3,IntegerType,true))) ])], 
IncludeProfile(ArrayBuffer(col2#3461)), CarbonAliasDecoderRelation(), 
org.apache.spark.sql.CarbonSession@69e87cbe
                  +- Scan CarbonDatasourceHadoopRelation [ Database name 
:tempdev, Table name :carbon_table, Schema 
:Some(StructType(StructField(col1,IntegerType,true), 
StructField(col2,IntegerType,true), StructField(col3,IntegerType,true))) ] 
tempdev.carbon_table[col1#3460,col2#3461]]



  was:
We can reproduce this issue as following steps:

Step1: create a carbon table
 
carbon.sql("CREATE TABLE IF NOT EXISTS carbon_table (col1 int, col2 int, col3 
int) STORED by 'carbondata' 
TBLPROPERTIES('DICTIONARY_INCLUDE'='col1,col2,col3','TABLE_BLOCKSIZE'='4')")
 
Step2: load data
carbon.sql("LOAD DATA LOCAL INPATH '/opt/carbon_table' INTO TABLE carbon_table")
 
you can get carbon_table file in attachment.
 
Step3: do the query
 
[expected] Hive table and parquet table get same result as below and it should 
be correct.

|col1|col1|col3|
|   1|null|null|
|null|   4|   1|
|   4|null|null|
|null|   7|   1|
|   7|null|null|
|null|   1|   1|
 
[acutally] carbon will get null because wrong match.

|col1|col1|col3|
|   1|   1|   1|
|   4|   4|   1|
|   7|   7|   1|

Root cause analysis:
 
It is because this query has two subquery, and one subquey do the decode after 
exchange and the other subquery do the decode before exchange, and this may 
lead to wrong match when execute full join.
 
My idea: Can we move decode before exchange ? Because I am not very familiar 
with Carbon query, so any idea about this ?

Plan as follows:
 
== Physical Plan ==
SortMergeJoin [col1#3445], [col1#3460], FullOuter
:- Sort [col1#3445 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(col1#3445, 200)
:     +- CarbonDictionaryDecoder [CarbonDecoderRelation(Map(col1#3445 -> 
col1#3445, col2#3446 -> col2#3446, col3#3447 -> 
col3#3447),CarbonDatasourceHadoopRelation [ Database name :tempdev, Table name 
:carbon_table, Schema :Some(StructType(StructField(col1,IntegerType,true), 
StructField(col2,IntegerType,true), StructField(col3,IntegerType,true))) ]), 
CarbonDecoderRelation(Map(col1#3460 -> col1#3460, col2#3461 -> col2#3461, 
col3#3462 -> col3#3462),CarbonDatasourceHadoopRelation [ Database name 
:tempdev, Table name :carbon_table, Schema 
:Some(StructType(StructField(col1,IntegerType,true), 
StructField(col2,IntegerType,true), StructField(col3,IntegerType,true))) ])], 
IncludeProfile(ArrayBuffer(col1#3445)), CarbonAliasDecoderRelation(), 
org.apache.spark.sql.CarbonSession@69e87cbe
:        +- HashAggregate(keys=[col1#3445, col2#3446], functions=[], 
output=[col1#3445])
:           +- Exchange hashpartitioning(col1#3445, col2#3446, 200)
:              +- HashAggregate(keys=[col1#3445, col2#3446], functions=[], 
output=[col1#3445, col2#3446])
:                 +- Scan CarbonDatasourceHadoopRelation [ Database name 
:tempdev, Table name :carbon_table, Schema 
:Some(StructType(StructField(col1,IntegerType,true), 
StructField(col2,IntegerType,true), StructField(col3,IntegerType,true))) ] 
tempdev.carbon_table[col1#3445,col2#3446] 
+- Sort [col1#3460 ASC NULLS FIRST], false, 0
   +- CarbonDictionaryDecoder [CarbonDecoderRelation(Map(col1#3445 -> 
col1#3445, col2#3446 -> col2#3446, col3#3447 -> 
col3#3447),CarbonDatasourceHadoopRelation [ Database name :tempdev, Table name 
:carbon_table, Schema :Some(StructType(StructField(col1,IntegerType,true), 
StructField(col2,IntegerType,true), StructField(col3,IntegerType,true))) ]), 
CarbonDecoderRelation(Map(col1#3460 -> col1#3460, col2#3461 -> col2#3461, 
col3#3462 -> col3#3462),CarbonDatasourceHadoopRelation [ Database name 
:tempdev, Table name :carbon_table, Schema 
:Some(StructType(StructField(col1,IntegerType,true), 
StructField(col2,IntegerType,true), StructField(col3,IntegerType,true))) ])], 
IncludeProfile(ArrayBuffer(col1#3460)), CarbonAliasDecoderRelation(), 
org.apache.spark.sql.CarbonSession@69e87cbe
      +- HashAggregate(keys=[col1#3460], functions=[count(col2#3461)], 
output=[col1#3460, col3#3436L])
         +- Exchange hashpartitioning(col1#3460, 200)
            +- HashAggregate(keys=[col1#3460], 
functions=[partial_count(col2#3461)], output=[col1#3460, count#3472L])
               +- CarbonDictionaryDecoder [CarbonDecoderRelation(Map(col1#3445 
-> col1#3445, col2#3446 -> col2#3446, col3#3447 -> 
col3#3447),CarbonDatasourceHadoopRelation [ Database name :tempdev, Table name 
:carbon_table, Schema :Some(StructType(StructField(col1,IntegerType,true), 
StructField(col2,IntegerType,true), StructField(col3,IntegerType,true))) ]), 
CarbonDecoderRelation(Map(col1#3460 -> col1#3460, col2#3461 -> col2#3461, 
col3#3462 -> col3#3462),CarbonDatasourceHadoopRelation [ Database name 
:tempdev, Table name :carbon_table, Schema 
:Some(StructType(StructField(col1,IntegerType,true), 
StructField(col2,IntegerType,true), StructField(col3,IntegerType,true))) ])], 
IncludeProfile(ArrayBuffer(col2#3461)), CarbonAliasDecoderRelation(), 
org.apache.spark.sql.CarbonSession@69e87cbe
                  +- Scan CarbonDatasourceHadoopRelation [ Database name 
:tempdev, Table name :carbon_table, Schema 
:Some(StructType(StructField(col1,IntegerType,true), 
StructField(col2,IntegerType,true), StructField(col3,IntegerType,true))) ] 
tempdev.carbon_table[col1#3460,col2#3461]]




> Join Issue caused by dictionary and shuffle exchange
> ----------------------------------------------------
>
>                 Key: CARBONDATA-1076
>                 URL: https://issues.apache.org/jira/browse/CARBONDATA-1076
>             Project: CarbonData
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.1.1-incubating, 1.1.0
>         Environment: Carbon + spark 2.1
>            Reporter: chenerlu
>            Assignee: Ravindra Pesala
>
> We can reproduce this issue as following steps:
> Step1: create a carbon table
>  
> carbon.sql("CREATE TABLE IF NOT EXISTS carbon_table (col1 int, col2 int, col3 
> int) STORED by 'carbondata' 
> TBLPROPERTIES('DICTIONARY_INCLUDE'='col1,col2,col3','TABLE_BLOCKSIZE'='4')")
>  
> Step2: load data
> carbon.sql("LOAD DATA LOCAL INPATH '/opt/carbon_table' INTO TABLE 
> carbon_table")
> data in file carbon_table as follows:
> col1,col2,col3
> 1,2,3
> 4,5,6
> 7,8,9
>  
> you can get carbon_table file in attachment.
>  
> Step3: do the query
> carbon.sql("SELECT c1.col1,c2.col1,c2.col3 FROM (SELECT col1,col2 FROM 
> carbon_table GROUP BY col1,col2) c1 FULL JOIN (SELECT col1,count(col2) as 
> col3 FROM carbon_table GROUP BY col1) c2 ON c1.col1 = c2.col1").show()
> [expected] Hive table and parquet table get same result as below and it 
> should be correct.
> |col1|col1|col3|
> |   1|null|null|
> |null|   4|   1|
> |   4|null|null|
> |null|   7|   1|
> |   7|null|null|
> |null|   1|   1|
>  
> [acutally] carbon will get null because wrong match.
> |col1|col1|col3|
> |   1|   1|   1|
> |   4|   4|   1|
> |   7|   7|   1|
> Root cause analysis:
>  
> It is because this query has two subquery, and one subquey do the decode 
> after exchange and the other subquery do the decode before exchange, and this 
> may lead to wrong match when execute full join.
>  
> My idea: Can we move decode before exchange ? Because I am not very familiar 
> with Carbon query, so any idea about this ?
> Plan as follows:
>  
> == Physical Plan ==
> SortMergeJoin [col1#3445], [col1#3460], FullOuter
> :- Sort [col1#3445 ASC NULLS FIRST], false, 0
> :  +- Exchange hashpartitioning(col1#3445, 200)
> :     +- CarbonDictionaryDecoder [CarbonDecoderRelation(Map(col1#3445 -> 
> col1#3445, col2#3446 -> col2#3446, col3#3447 -> 
> col3#3447),CarbonDatasourceHadoopRelation [ Database name :tempdev, Table 
> name :carbon_table, Schema 
> :Some(StructType(StructField(col1,IntegerType,true), 
> StructField(col2,IntegerType,true), StructField(col3,IntegerType,true))) ]), 
> CarbonDecoderRelation(Map(col1#3460 -> col1#3460, col2#3461 -> col2#3461, 
> col3#3462 -> col3#3462),CarbonDatasourceHadoopRelation [ Database name 
> :tempdev, Table name :carbon_table, Schema 
> :Some(StructType(StructField(col1,IntegerType,true), 
> StructField(col2,IntegerType,true), StructField(col3,IntegerType,true))) ])], 
> IncludeProfile(ArrayBuffer(col1#3445)), CarbonAliasDecoderRelation(), 
> org.apache.spark.sql.CarbonSession@69e87cbe
> :        +- HashAggregate(keys=[col1#3445, col2#3446], functions=[], 
> output=[col1#3445])
> :           +- Exchange hashpartitioning(col1#3445, col2#3446, 200)
> :              +- HashAggregate(keys=[col1#3445, col2#3446], functions=[], 
> output=[col1#3445, col2#3446])
> :                 +- Scan CarbonDatasourceHadoopRelation [ Database name 
> :tempdev, Table name :carbon_table, Schema 
> :Some(StructType(StructField(col1,IntegerType,true), 
> StructField(col2,IntegerType,true), StructField(col3,IntegerType,true))) ] 
> tempdev.carbon_table[col1#3445,col2#3446] 
> +- Sort [col1#3460 ASC NULLS FIRST], false, 0
>    +- CarbonDictionaryDecoder [CarbonDecoderRelation(Map(col1#3445 -> 
> col1#3445, col2#3446 -> col2#3446, col3#3447 -> 
> col3#3447),CarbonDatasourceHadoopRelation [ Database name :tempdev, Table 
> name :carbon_table, Schema 
> :Some(StructType(StructField(col1,IntegerType,true), 
> StructField(col2,IntegerType,true), StructField(col3,IntegerType,true))) ]), 
> CarbonDecoderRelation(Map(col1#3460 -> col1#3460, col2#3461 -> col2#3461, 
> col3#3462 -> col3#3462),CarbonDatasourceHadoopRelation [ Database name 
> :tempdev, Table name :carbon_table, Schema 
> :Some(StructType(StructField(col1,IntegerType,true), 
> StructField(col2,IntegerType,true), StructField(col3,IntegerType,true))) ])], 
> IncludeProfile(ArrayBuffer(col1#3460)), CarbonAliasDecoderRelation(), 
> org.apache.spark.sql.CarbonSession@69e87cbe
>       +- HashAggregate(keys=[col1#3460], functions=[count(col2#3461)], 
> output=[col1#3460, col3#3436L])
>          +- Exchange hashpartitioning(col1#3460, 200)
>             +- HashAggregate(keys=[col1#3460], 
> functions=[partial_count(col2#3461)], output=[col1#3460, count#3472L])
>                +- CarbonDictionaryDecoder 
> [CarbonDecoderRelation(Map(col1#3445 -> col1#3445, col2#3446 -> col2#3446, 
> col3#3447 -> col3#3447),CarbonDatasourceHadoopRelation [ Database name 
> :tempdev, Table name :carbon_table, Schema 
> :Some(StructType(StructField(col1,IntegerType,true), 
> StructField(col2,IntegerType,true), StructField(col3,IntegerType,true))) ]), 
> CarbonDecoderRelation(Map(col1#3460 -> col1#3460, col2#3461 -> col2#3461, 
> col3#3462 -> col3#3462),CarbonDatasourceHadoopRelation [ Database name 
> :tempdev, Table name :carbon_table, Schema 
> :Some(StructType(StructField(col1,IntegerType,true), 
> StructField(col2,IntegerType,true), StructField(col3,IntegerType,true))) ])], 
> IncludeProfile(ArrayBuffer(col2#3461)), CarbonAliasDecoderRelation(), 
> org.apache.spark.sql.CarbonSession@69e87cbe
>                   +- Scan CarbonDatasourceHadoopRelation [ Database name 
> :tempdev, Table name :carbon_table, Schema 
> :Some(StructType(StructField(col1,IntegerType,true), 
> StructField(col2,IntegerType,true), StructField(col3,IntegerType,true))) ] 
> tempdev.carbon_table[col1#3460,col2#3461]]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to