[ https://issues.apache.org/jira/browse/CARBONDATA-1076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16019445#comment-16019445 ]
chenerlu commented on CARBONDATA-1076: -------------------------------------- Yes, I have reproduced this problem with csv file. Data in csv file ias follows: col1,col2,col3 1,2,3 4,5,6 7,8,9 > Join Issue caused by dictionary and shuffle exchange > ---------------------------------------------------- > > Key: CARBONDATA-1076 > URL: https://issues.apache.org/jira/browse/CARBONDATA-1076 > Project: CarbonData > Issue Type: Bug > Components: core > Affects Versions: 0.1.1-incubating, 1.1.0 > Environment: Carbon + spark 2.1 > Reporter: chenerlu > Assignee: Ravindra Pesala > > We can reproduce this issue as following steps: > Step1: create a carbon table > > carbon.sql("CREATE TABLE IF NOT EXISTS carbon_table (col1 int, col2 int, col3 > int) STORED by 'carbondata' > TBLPROPERTIES('DICTIONARY_INCLUDE'='col1,col2,col3','TABLE_BLOCKSIZE'='4')") > > Step2: load data > carbon.sql("LOAD DATA LOCAL INPATH '/opt/carbon_table' INTO TABLE > carbon_table") > data in file carbon_table as follows: > col1,col2,col3 > 1,2,3 > 4,5,6 > 7,8,9 > > Step3: do the query > carbon.sql("SELECT c1.col1,c2.col1,c2.col3 FROM (SELECT col1,col2 FROM > carbon_table GROUP BY col1,col2) c1 FULL JOIN (SELECT col1,count(col2) as > col3 FROM carbon_table GROUP BY col1) c2 ON c1.col1 = c2.col1").show() > [expected] Hive table and parquet table get same result as below and it > should be correct. > |col1|col1|col3| > | 1| 1| 1| > | 4| 4| 1| > | 7| 7| 1| > [acutally] carbon will get null because wrong match. > |col1|col1|col3| > | 1|null|null| > |null| 4| 1| > | 4|null|null| > |null| 7| 1| > | 7|null|null| > |null| 1| 1| > Root cause analysis: > > It is because this query has two subquery, and one subquey do the decode > after exchange and the other subquery do the decode before exchange, and this > may lead to wrong match when execute full join. > > My idea: Can we move decode before exchange ? Because I am not very familiar > with Carbon query, so any idea about this ? > Plan as follows: > > == Physical Plan == > SortMergeJoin [col1#3445], [col1#3460], FullOuter > :- Sort [col1#3445 ASC NULLS FIRST], false, 0 > : +- Exchange hashpartitioning(col1#3445, 200) > : +- CarbonDictionaryDecoder [CarbonDecoderRelation(Map(col1#3445 -> > col1#3445, col2#3446 -> col2#3446, col3#3447 -> > col3#3447),CarbonDatasourceHadoopRelation [ Database name :tempdev, Table > name :carbon_table, Schema > :Some(StructType(StructField(col1,IntegerType,true), > StructField(col2,IntegerType,true), StructField(col3,IntegerType,true))) ]), > CarbonDecoderRelation(Map(col1#3460 -> col1#3460, col2#3461 -> col2#3461, > col3#3462 -> col3#3462),CarbonDatasourceHadoopRelation [ Database name > :tempdev, Table name :carbon_table, Schema > :Some(StructType(StructField(col1,IntegerType,true), > StructField(col2,IntegerType,true), StructField(col3,IntegerType,true))) ])], > IncludeProfile(ArrayBuffer(col1#3445)), CarbonAliasDecoderRelation(), > org.apache.spark.sql.CarbonSession@69e87cbe > : +- HashAggregate(keys=[col1#3445, col2#3446], functions=[], > output=[col1#3445]) > : +- Exchange hashpartitioning(col1#3445, col2#3446, 200) > : +- HashAggregate(keys=[col1#3445, col2#3446], functions=[], > output=[col1#3445, col2#3446]) > : +- Scan CarbonDatasourceHadoopRelation [ Database name > :tempdev, Table name :carbon_table, Schema > :Some(StructType(StructField(col1,IntegerType,true), > StructField(col2,IntegerType,true), StructField(col3,IntegerType,true))) ] > tempdev.carbon_table[col1#3445,col2#3446] > +- Sort [col1#3460 ASC NULLS FIRST], false, 0 > +- CarbonDictionaryDecoder [CarbonDecoderRelation(Map(col1#3445 -> > col1#3445, col2#3446 -> col2#3446, col3#3447 -> > col3#3447),CarbonDatasourceHadoopRelation [ Database name :tempdev, Table > name :carbon_table, Schema > :Some(StructType(StructField(col1,IntegerType,true), > StructField(col2,IntegerType,true), StructField(col3,IntegerType,true))) ]), > CarbonDecoderRelation(Map(col1#3460 -> col1#3460, col2#3461 -> col2#3461, > col3#3462 -> col3#3462),CarbonDatasourceHadoopRelation [ Database name > :tempdev, Table name :carbon_table, Schema > :Some(StructType(StructField(col1,IntegerType,true), > StructField(col2,IntegerType,true), StructField(col3,IntegerType,true))) ])], > IncludeProfile(ArrayBuffer(col1#3460)), CarbonAliasDecoderRelation(), > org.apache.spark.sql.CarbonSession@69e87cbe > +- HashAggregate(keys=[col1#3460], functions=[count(col2#3461)], > output=[col1#3460, col3#3436L]) > +- Exchange hashpartitioning(col1#3460, 200) > +- HashAggregate(keys=[col1#3460], > functions=[partial_count(col2#3461)], output=[col1#3460, count#3472L]) > +- CarbonDictionaryDecoder > [CarbonDecoderRelation(Map(col1#3445 -> col1#3445, col2#3446 -> col2#3446, > col3#3447 -> col3#3447),CarbonDatasourceHadoopRelation [ Database name > :tempdev, Table name :carbon_table, Schema > :Some(StructType(StructField(col1,IntegerType,true), > StructField(col2,IntegerType,true), StructField(col3,IntegerType,true))) ]), > CarbonDecoderRelation(Map(col1#3460 -> col1#3460, col2#3461 -> col2#3461, > col3#3462 -> col3#3462),CarbonDatasourceHadoopRelation [ Database name > :tempdev, Table name :carbon_table, Schema > :Some(StructType(StructField(col1,IntegerType,true), > StructField(col2,IntegerType,true), StructField(col3,IntegerType,true))) ])], > IncludeProfile(ArrayBuffer(col2#3461)), CarbonAliasDecoderRelation(), > org.apache.spark.sql.CarbonSession@69e87cbe > +- Scan CarbonDatasourceHadoopRelation [ Database name > :tempdev, Table name :carbon_table, Schema > :Some(StructType(StructField(col1,IntegerType,true), > StructField(col2,IntegerType,true), StructField(col3,IntegerType,true))) ] > tempdev.carbon_table[col1#3460,col2#3461]] -- This message was sent by Atlassian JIRA (v6.3.15#6346)