[ https://issues.apache.org/jira/browse/SPARK-23298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16577741#comment-16577741 ]
Mateusz Jukiewicz commented on SPARK-23298: ------------------------------------------- [~tgraves] I edited the issue description and added a reproducible example which you can try out. Please keep in mind it might take several spark session of "distinct counting" to observe. On the other hand, I tried and cannot reproduce the issue on Spark 2.3.1. Therefore the aforementioned SPARK-23207 could have fixed this one as well. Obviously, it would probably be the best if you managed to reproduce on Spark 2.2 and confirm SPARK-23207 fixes this one as well. But if you guys don't have time for that, I'm fine with closing this one right away. > distinct.count on Dataset/DataFrame yields non-deterministic results > -------------------------------------------------------------------- > > Key: SPARK-23298 > URL: https://issues.apache.org/jira/browse/SPARK-23298 > Project: Spark > Issue Type: Bug > Components: Shuffle, SQL, YARN > Affects Versions: 2.1.0, 2.2.0 > Environment: Spark 2.2.0 or 2.1.0 > Java 1.8.0_144 > Yarn version: > {code:java} > Hadoop 2.6.0-cdh5.12.1 > Subversion http://github.com/cloudera/hadoop -r > 520d8b072e666e9f21d645ca6a5219fc37535a52 > Compiled by jenkins on 2017-08-24T16:43Z > Compiled with protoc 2.5.0 > From source with checksum de51bf9693ab9426379a1cd28142cea0 > This command was run using > /usr/lib/hadoop/hadoop-common-2.6.0-cdh5.12.1.jar{code} > > > Reporter: Mateusz Jukiewicz > Priority: Major > Labels: Correctness, CorrectnessBug, correctness > > This is what happens (EDIT - managed to get a reproducible example): > {code:java} > /* Exemplary spark-shell starting command > /opt/spark/bin/spark-shell \ > --num-executors 269 \ > --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \ > --conf spark.kryoserializer.buffer.max=512m > // The spark.sql.shuffle.partitions is 2154 here, if that matters > */ > val df = spark.range(10000000).withColumn("col1", (rand() * > 1000).cast("long")).withColumn("col2", (rand() * > 1000).cast("long")).drop("id") > df.repartition(5240).write.parquet("/test.parquet") > // Then, ideally in a new session > val df = spark.read.parquet("/test.parquet") > df.distinct.count > // res1: Long = 1001256 > > df.distinct.count > // res2: Long = 999955 {code} > -The _text_dataset.out_ file is a dataset with one string per line. The > string has alphanumeric characters as well as colons and spaces. The line > length does not exceed 1200. I don't think that's important though, as the > issue appeared on various other datasets, I just tried to narrow it down to > the simplest possible case.- (the case is now fully reproducible with the > above code) > The observations regarding the issue are as follows: > * I managed to reproduce it on both spark 2.2 and spark 2.1. > * The issue occurs in YARN cluster mode (I haven't tested YARN client mode). > * The issue is not reproducible on a single machine (e.g. laptop) in spark > local mode. > * It seems that once the correct count is computed, it is not possible to > reproduce the issue in the same spark session. In other words, I was able to > get 2-3 incorrect distinct.count results consecutively, but once it got > right, it always returned the correct value. I had to re-run spark-shell to > observe the problem again. > * The issue appears on both Dataset and DataFrame (i.e. using read.text or > read.textFile). > * The issue is not reproducible on RDD (i.e. dataset.rdd.distinct.count). > * Not a single container has failed in those multiple invalid executions. > * YARN doesn't show any warnings or errors in those invalid executions. > * The execution plan determined for both valid and invalid executions was > always the same (it's shown in the _SQL_ tab of the UI). > * The number returned in the invalid executions was always greater than the > correct number (24 014 227). > * This occurs even though the input is already completely deduplicated (i.e. > _distinct.count_ shouldn't change anything). > * The input isn't replicated (i.e. there's only one copy of each file block > on the HDFS). > * The problem is probably not related to reading from HDFS. Spark was always > able to correctly read all input records (which was shown in the UI), and > that number got malformed after the exchange phase: > ** correct execution: > Input Size / Records: 3.9 GB / 24014227 _(first stage)_ > Shuffle Write: 3.3 GB / 24014227 _(first stage)_ > Shuffle Read: 3.3 GB / 24014227 _(second stage)_ > ** incorrect execution: > Input Size / Records: 3.9 GB / 24014227 _(first stage)_ > Shuffle Write: 3.3 GB / 24014227 _(first stage)_ > Shuffle Read: 3.3 GB / 24020150 _(second stage)_ > * The problem might be related with the internal way of Encoders hashing. > The reason might be: > ** in a simple `distinct.count` invocation, there are in total three > hash-related stages (called `HashAggregate`), > ** excerpt from scaladoc for `distinct` method says: > {code:java} > * @note Equality checking is performed directly on the encoded > representation of the data > * and thus is not affected by a custom `equals` function defined on > `T`.{code} > * One of my suspicions was the number of partitions we're using (2154). This > is greater than 2000, which means that a different data structure (i.e. > _HighlyCompressedMapStatus_instead of _CompressedMapStatus_) will be used for > book-keeping during the shuffle. Unfortunately after decreasing the number > below this threshold the problem still occurs. > * It's easier to reproduce the issue with a large number of partitions. > * One of my another suspicions was that it's somehow related to the number > of blocks on the HDFS (974). I was able to reproduce the problem with both > less and more partitions than this value, so I think this is not the case. > * Final note: It looks like for some reason the data gets duplicated in the > process of data exchange during the shuffle (because shuffle read sees more > elements than shuffle write has written). > Please let me know if you have any other questions. > I couldn't find much about similar problems on the Web, the only thing I > found was on the spark mailing list where someone using PySpark has found > that one of his/her executors was hashing things differently than the other > one which caused a similar issue. > I didn't include a reproducible example as this is just a long file with > strings and as this occurred on many different datasets, I doubt it's > data-related. If that's necessary though, please let me know and I will try > to prepare an example. -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org