[ https://issues.apache.org/jira/browse/SPARK-23076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16326304#comment-16326304 ]
Wenchen Fan commented on SPARK-23076: ------------------------------------- To be clear, you maintain an internal Spark version and cache the MapPartitionsRDD by changing the Spark SQL code base? Then this is not a bug... > When we call cache() on RDD which depends on ShuffleRowRDD, we will get an > error result > --------------------------------------------------------------------------------------- > > Key: SPARK-23076 > URL: https://issues.apache.org/jira/browse/SPARK-23076 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 2.1.0 > Reporter: zhoukang > Priority: Major > Attachments: shufflerowrdd-cache.png > > > For query below: > {code:java} > select * from csv_demo limit 3; > {code} > The correct result should be: > 0: jdbc:hive2://10.108.230.228:10000/> select * from csv_demo limit 3; > +----------------+++-- > |_c0|_c1| > +----------------+++-- > |Joe|20| > |Tom|30| > |Hyukjin|25| > +----------------+++-- > However,when we call cache on MapPartitionsRDD below: > !shufflerowrdd-cache.png! > Then result will be error: > 0: jdbc:hive2://xxx/> select * from csv_demo limit 3; > +----------------+++-- > |_c0|_c1| > +----------------+++-- > |Hyukjin|25| > |Hyukjin|25| > |Hyukjin|25| > +----------------+++-- > The reason why this happen is that: > UnsafeRow which generated by ShuffleRowRDD#compute will use the same under > byte buffer > I print some log below to explain this: > Modify UnsafeRow.toString() > {code:java} > // This is for debugging > @Override > public String toString() { > StringBuilder build = new StringBuilder("["); > for (int i = 0; i < sizeInBytes; i += 8) { > if (i != 0) build.append(','); > build.append(java.lang.Long.toHexString(Platform.getLong(baseObject, > baseOffset + i))); > } > build.append(","+baseObject+","+baseOffset+']'); // Print baseObject and > baseOffset here > return build.toString(); > }{code} > {code:java} > 2018-01-12,22:08:47,441 INFO org.apache.spark.sql.execution.ShuffledRowRDD: > Read value: [0,1800000003,2000000002,656f4a,3032,[B@6225ec90,16] > 2018-01-12,22:08:47,445 INFO org.apache.spark.sql.execution.ShuffledRowRDD: > Read value: [0,1800000003,2000000002,6d6f54,3033,[B@6225ec90,16] > 2018-01-12,22:08:47,448 INFO org.apache.spark.sql.execution.ShuffledRowRDD: > Read value: [0,1800000007,2000000002,6e696a6b757948,3532,[B@6225ec90,16] > {code} > we can fix this by add a config,and copy UnsafeRow read by ShuffleRowRDD > iterator when config is true,like below: > {code:java} > reader.read().asInstanceOf[Iterator[Product2[Int, InternalRow]]].map(x => { > if (SparkEnv.get.conf.get(StaticSQLConf.UNSAFEROWRDD_CACHE_ENABLE) > && x._2.isInstanceOf[UnsafeRow]) { > (x._2).asInstanceOf[UnsafeRow].copy() > } else { > x._2 > } > }) > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org