[ 
https://issues.apache.org/jira/browse/SPARK-23076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16326304#comment-16326304
 ] 

Wenchen Fan commented on SPARK-23076:
-------------------------------------

To be clear, you maintain an internal Spark version and cache the 
MapPartitionsRDD by changing the Spark SQL code base? Then this is not a bug...

> When we call cache() on RDD which depends on ShuffleRowRDD, we will get an 
> error result
> ---------------------------------------------------------------------------------------
>
>                 Key: SPARK-23076
>                 URL: https://issues.apache.org/jira/browse/SPARK-23076
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.1.0
>            Reporter: zhoukang
>            Priority: Major
>         Attachments: shufflerowrdd-cache.png
>
>
> For query below:
> {code:java}
> select * from csv_demo limit 3;
> {code}
> The correct result should be:
>  0: jdbc:hive2://10.108.230.228:10000/> select * from csv_demo limit 3;
>  +----------------+++--
> |_c0|_c1|
> +----------------+++--
> |Joe|20|
> |Tom|30|
> |Hyukjin|25|
> +----------------+++--
>  However,when we call cache on MapPartitionsRDD below:
>   !shufflerowrdd-cache.png!
> Then result will be error:
>  0: jdbc:hive2://xxx/> select * from csv_demo limit 3;
>  +----------------+++--
> |_c0|_c1|
> +----------------+++--
> |Hyukjin|25|
> |Hyukjin|25|
> |Hyukjin|25|
> +----------------+++--
>  The reason why this happen is that:
> UnsafeRow which generated by ShuffleRowRDD#compute will use the same under 
> byte buffer
> I print some log below to explain this:
> Modify UnsafeRow.toString()
> {code:java}
> // This is for debugging
> @Override
> public String toString() {
>   StringBuilder build = new StringBuilder("[");
>   for (int i = 0; i < sizeInBytes; i += 8) {
>     if (i != 0) build.append(',');
>     build.append(java.lang.Long.toHexString(Platform.getLong(baseObject, 
> baseOffset + i)));
>   }
>   build.append(","+baseObject+","+baseOffset+']'); // Print baseObject and 
> baseOffset here
>   return build.toString();
> }{code}
> {code:java}
> 2018-01-12,22:08:47,441 INFO org.apache.spark.sql.execution.ShuffledRowRDD: 
> Read value: [0,1800000003,2000000002,656f4a,3032,[B@6225ec90,16]
> 2018-01-12,22:08:47,445 INFO org.apache.spark.sql.execution.ShuffledRowRDD: 
> Read value: [0,1800000003,2000000002,6d6f54,3033,[B@6225ec90,16]
> 2018-01-12,22:08:47,448 INFO org.apache.spark.sql.execution.ShuffledRowRDD: 
> Read value: [0,1800000007,2000000002,6e696a6b757948,3532,[B@6225ec90,16]
> {code}
> we can fix this by add a config,and copy UnsafeRow read by ShuffleRowRDD 
> iterator when config is true,like below:
> {code:java}
> reader.read().asInstanceOf[Iterator[Product2[Int, InternalRow]]].map(x => {
>     if (SparkEnv.get.conf.get(StaticSQLConf.UNSAFEROWRDD_CACHE_ENABLE)
>       && x._2.isInstanceOf[UnsafeRow]) {
>       (x._2).asInstanceOf[UnsafeRow].copy()
>     } else {
>       x._2
>     }
>   })
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to