[ 
https://issues.apache.org/jira/browse/SPARK-23298?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mateusz Jukiewicz updated SPARK-23298:
--------------------------------------
    Description: 
This is what happens (EDIT - managed to get a reproducible example):
{code:java}
/* Exemplary spark-shell starting command 
/opt/spark/bin/spark-shell \
--num-executors 269 \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
--conf spark.kryoserializer.buffer.max=512m 

// The spark.sql.shuffle.partitions is 2154 here, if that matters
*/

val df = spark.range(10000000).withColumn("col1", (rand() * 
1000).cast("long")).withColumn("col2", (rand() * 1000).cast("long")).drop("id")
df.repartition(5240).write.parquet("/test.parquet")

// Then, ideally in a new session
val df = spark.read.parquet("/test.parquet")
df.distinct.count
// res1: Long = 1001256                                                         
   
df.distinct.count
// res2: Long = 999955   {code}
-The _text_dataset.out_ file is a dataset with one string per line. The string 
has alphanumeric characters as well as colons and spaces. The line length does 
not exceed 1200. I don't think that's important though, as the issue appeared 
on various other datasets, I just tried to narrow it down to the simplest 
possible case.- (the case is now fully reproducible with the above code)

The observations regarding the issue are as follows:
 * I managed to reproduce it on both spark 2.2 and spark 2.1.
 * The issue occurs in YARN cluster mode (I haven't tested YARN client mode).
 * The issue is not reproducible on a single machine (e.g. laptop) in spark 
local mode.
 * It seems that once the correct count is computed, it is not possible to 
reproduce the issue in the same spark session. In other words, I was able to 
get 2-3 incorrect distinct.count results consecutively, but once it got right, 
it always returned the correct value. I had to re-run spark-shell to observe 
the problem again.
 * The issue appears on both Dataset and DataFrame (i.e. using read.text or 
read.textFile).
 * The issue is not reproducible on RDD (i.e. dataset.rdd.distinct.count).
 * Not a single container has failed in those multiple invalid executions.
 * YARN doesn't show any warnings or errors in those invalid executions.
 * The execution plan determined for both valid and invalid executions was 
always the same (it's shown in the _SQL_ tab of the UI).
 * The number returned in the invalid executions was always greater than the 
correct number (24 014 227).
 * This occurs even though the input is already completely deduplicated (i.e. 
_distinct.count_ shouldn't change anything).
 * The input isn't replicated (i.e. there's only one copy of each file block on 
the HDFS).
 * The problem is probably not related to reading from HDFS. Spark was always 
able to correctly read all input records (which was shown in the UI), and that 
number got malformed after the exchange phase:
 ** correct execution:
 Input Size / Records: 3.9 GB / 24014227 _(first stage)_
 Shuffle Write: 3.3 GB / 24014227 _(first stage)_
 Shuffle Read: 3.3 GB / 24014227 _(second stage)_
 ** incorrect execution:
 Input Size / Records: 3.9 GB / 24014227 _(first stage)_
 Shuffle Write: 3.3 GB / 24014227 _(first stage)_
 Shuffle Read: 3.3 GB / 24020150 _(second stage)_
 * The problem might be related with the internal way of Encoders hashing. The 
reason might be:
 ** in a simple `distinct.count` invocation, there are in total three 
hash-related stages (called `HashAggregate`),
 ** excerpt from scaladoc for `distinct` method says:
{code:java}
   * @note Equality checking is performed directly on the encoded 
representation of the data
   * and thus is not affected by a custom `equals` function defined on 
`T`.{code}

 * One of my suspicions was the number of partitions we're using (2154). This 
is greater than 2000, which means that a different data structure (i.e. 
_HighlyCompressedMapStatus_instead of _CompressedMapStatus_) will be used for 
book-keeping during the shuffle. Unfortunately after decreasing the number 
below this threshold the problem still occurs.
 * It's easier to reproduce the issue with a large number of partitions.
 * One of my another suspicions was that it's somehow related to the number of 
blocks on the HDFS (974). I was able to reproduce the problem with both less 
and more partitions than this value, so I think this is not the case.
 * Final note: It looks like for some reason the data gets duplicated in the 
process of data exchange during the shuffle (because shuffle read sees more 
elements than shuffle write has written).

Please let me know if you have any other questions.

I couldn't find much about similar problems on the Web, the only thing I found 
was on the spark mailing list where someone using PySpark has found that one of 
his/her executors was hashing things differently than the other one which 
caused a similar issue.

I didn't include a reproducible example as this is just a long file with 
strings and as this occurred on many different datasets, I doubt it's 
data-related. If that's necessary though, please let me know and I will try to 
prepare an example.

  was:
This is what happens:
{code:java}
/* Exemplary spark-shell starting command 
/opt/spark/bin/spark-shell \
--num-executors 269 \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
--conf spark.kryoserializer.buffer.max=512m 
*/

val dataset = spark.read.textFile("/text_dataset.out")

dataset.distinct.count
// res0: Long = 24025868
dataset.distinct.count
// res1: Long = 24014227{code}
The _text_dataset.out_ file is a dataset with one string per line. The string 
has alphanumeric characters as well as colons and spaces. The line length does 
not exceed 1200. I don't think that's important though, as the issue appeared 
on various other datasets, I just tried to narrow it down to the simplest 
possible case.

The observations regarding the issue are as follows:
 * I managed to reproduce it on both spark 2.2 and spark 2.1.
 * The issue occurs in YARN cluster mode (I haven't tested YARN client mode).
 * The issue is not reproducible on a single machine (e.g. laptop) in spark 
local mode.
 * It seems that once the correct count is computed, it is not possible to 
reproduce the issue in the same spark session. In other words, I was able to 
get 2-3 incorrect distinct.count results consecutively, but once it got right, 
it always returned the correct value. I had to re-run spark-shell to observe 
the problem again.
 * The issue appears on both Dataset and DataFrame (i.e. using read.text or 
read.textFile).
 * The issue is not reproducible on RDD (i.e. dataset.rdd.distinct.count).
 * Not a single container has failed in those multiple invalid executions.
 * YARN doesn't show any warnings or errors in those invalid executions.
 * The execution plan determined for both valid and invalid executions was 
always the same (it's shown in the _SQL_ tab of the UI).
 * The number returned in the invalid executions was always greater than the 
correct number (24 014 227).
 * This occurs even though the input is already completely deduplicated (i.e. 
_distinct.count_ shouldn't change anything).
 * The input isn't replicated (i.e. there's only one copy of each file block on 
the HDFS).
 * The problem is probably not related to reading from HDFS. Spark was always 
able to correctly read all input records (which was shown in the UI), and that 
number got malformed after the exchange phase:
 ** correct execution:
 Input Size / Records: 3.9 GB / 24014227 _(first stage)_
 Shuffle Write: 3.3 GB / 24014227 _(first stage)_
 Shuffle Read: 3.3 GB / 24014227 _(second stage)_
 ** incorrect execution:
 Input Size / Records: 3.9 GB / 24014227 _(first stage)_
 Shuffle Write: 3.3 GB / 24014227 _(first stage)_
 Shuffle Read: 3.3 GB / 24020150 _(second stage)_
 * The problem might be related with the internal way of Encoders hashing. The 
reason might be:
 ** in a simple `distinct.count` invocation, there are in total three 
hash-related stages (called `HashAggregate`),
 ** excerpt from scaladoc for `distinct` method says:
{code:java}
   * @note Equality checking is performed directly on the encoded 
representation of the data
   * and thus is not affected by a custom `equals` function defined on 
`T`.{code}

 * One of my suspicions was the number of partitions we're using (2154). This 
is greater than 2000, which means that a different data structure (i.e. 
_HighlyCompressedMapStatus_instead of _CompressedMapStatus_) will be used for 
book-keeping during the shuffle. Unfortunately after decreasing the number 
below this threshold the problem still occurs.
 * It's easier to reproduce the issue with a large number of partitions.
 * One of my another suspicions was that it's somehow related to the number of 
blocks on the HDFS (974). I was able to reproduce the problem with both less 
and more partitions than this value, so I think this is not the case.
 * Final note: It looks like for some reason the data gets duplicated in the 
process of data exchange during the shuffle (because shuffle read sees more 
elements than shuffle write has written).

Please let me know if you have any other questions.

I couldn't find much about similar problems on the Web, the only thing I found 
was on the spark mailing list where someone using PySpark has found that one of 
his/her executors was hashing things differently than the other one which 
caused a similar issue.

I didn't include a reproducible example as this is just a long file with 
strings and as this occurred on many different datasets, I doubt it's 
data-related. If that's necessary though, please let me know and I will try to 
prepare an example.


> distinct.count on Dataset/DataFrame yields non-deterministic results
> --------------------------------------------------------------------
>
>                 Key: SPARK-23298
>                 URL: https://issues.apache.org/jira/browse/SPARK-23298
>             Project: Spark
>          Issue Type: Bug
>          Components: Shuffle, SQL, YARN
>    Affects Versions: 2.1.0, 2.2.0
>         Environment: Spark 2.2.0 or 2.1.0
> Java 1.8.0_144
> Yarn version:
> {code:java}
> Hadoop 2.6.0-cdh5.12.1
> Subversion http://github.com/cloudera/hadoop -r 
> 520d8b072e666e9f21d645ca6a5219fc37535a52
> Compiled by jenkins on 2017-08-24T16:43Z
> Compiled with protoc 2.5.0
> From source with checksum de51bf9693ab9426379a1cd28142cea0
> This command was run using 
> /usr/lib/hadoop/hadoop-common-2.6.0-cdh5.12.1.jar{code}
>  
>  
>            Reporter: Mateusz Jukiewicz
>            Priority: Major
>              Labels: Correctness, CorrectnessBug, correctness
>
> This is what happens (EDIT - managed to get a reproducible example):
> {code:java}
> /* Exemplary spark-shell starting command 
> /opt/spark/bin/spark-shell \
> --num-executors 269 \
> --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
> --conf spark.kryoserializer.buffer.max=512m 
> // The spark.sql.shuffle.partitions is 2154 here, if that matters
> */
> val df = spark.range(10000000).withColumn("col1", (rand() * 
> 1000).cast("long")).withColumn("col2", (rand() * 
> 1000).cast("long")).drop("id")
> df.repartition(5240).write.parquet("/test.parquet")
> // Then, ideally in a new session
> val df = spark.read.parquet("/test.parquet")
> df.distinct.count
> // res1: Long = 1001256                                                       
>      
> df.distinct.count
> // res2: Long = 999955   {code}
> -The _text_dataset.out_ file is a dataset with one string per line. The 
> string has alphanumeric characters as well as colons and spaces. The line 
> length does not exceed 1200. I don't think that's important though, as the 
> issue appeared on various other datasets, I just tried to narrow it down to 
> the simplest possible case.- (the case is now fully reproducible with the 
> above code)
> The observations regarding the issue are as follows:
>  * I managed to reproduce it on both spark 2.2 and spark 2.1.
>  * The issue occurs in YARN cluster mode (I haven't tested YARN client mode).
>  * The issue is not reproducible on a single machine (e.g. laptop) in spark 
> local mode.
>  * It seems that once the correct count is computed, it is not possible to 
> reproduce the issue in the same spark session. In other words, I was able to 
> get 2-3 incorrect distinct.count results consecutively, but once it got 
> right, it always returned the correct value. I had to re-run spark-shell to 
> observe the problem again.
>  * The issue appears on both Dataset and DataFrame (i.e. using read.text or 
> read.textFile).
>  * The issue is not reproducible on RDD (i.e. dataset.rdd.distinct.count).
>  * Not a single container has failed in those multiple invalid executions.
>  * YARN doesn't show any warnings or errors in those invalid executions.
>  * The execution plan determined for both valid and invalid executions was 
> always the same (it's shown in the _SQL_ tab of the UI).
>  * The number returned in the invalid executions was always greater than the 
> correct number (24 014 227).
>  * This occurs even though the input is already completely deduplicated (i.e. 
> _distinct.count_ shouldn't change anything).
>  * The input isn't replicated (i.e. there's only one copy of each file block 
> on the HDFS).
>  * The problem is probably not related to reading from HDFS. Spark was always 
> able to correctly read all input records (which was shown in the UI), and 
> that number got malformed after the exchange phase:
>  ** correct execution:
>  Input Size / Records: 3.9 GB / 24014227 _(first stage)_
>  Shuffle Write: 3.3 GB / 24014227 _(first stage)_
>  Shuffle Read: 3.3 GB / 24014227 _(second stage)_
>  ** incorrect execution:
>  Input Size / Records: 3.9 GB / 24014227 _(first stage)_
>  Shuffle Write: 3.3 GB / 24014227 _(first stage)_
>  Shuffle Read: 3.3 GB / 24020150 _(second stage)_
>  * The problem might be related with the internal way of Encoders hashing. 
> The reason might be:
>  ** in a simple `distinct.count` invocation, there are in total three 
> hash-related stages (called `HashAggregate`),
>  ** excerpt from scaladoc for `distinct` method says:
> {code:java}
>    * @note Equality checking is performed directly on the encoded 
> representation of the data
>    * and thus is not affected by a custom `equals` function defined on 
> `T`.{code}
>  * One of my suspicions was the number of partitions we're using (2154). This 
> is greater than 2000, which means that a different data structure (i.e. 
> _HighlyCompressedMapStatus_instead of _CompressedMapStatus_) will be used for 
> book-keeping during the shuffle. Unfortunately after decreasing the number 
> below this threshold the problem still occurs.
>  * It's easier to reproduce the issue with a large number of partitions.
>  * One of my another suspicions was that it's somehow related to the number 
> of blocks on the HDFS (974). I was able to reproduce the problem with both 
> less and more partitions than this value, so I think this is not the case.
>  * Final note: It looks like for some reason the data gets duplicated in the 
> process of data exchange during the shuffle (because shuffle read sees more 
> elements than shuffle write has written).
> Please let me know if you have any other questions.
> I couldn't find much about similar problems on the Web, the only thing I 
> found was on the spark mailing list where someone using PySpark has found 
> that one of his/her executors was hashing things differently than the other 
> one which caused a similar issue.
> I didn't include a reproducible example as this is just a long file with 
> strings and as this occurred on many different datasets, I doubt it's 
> data-related. If that's necessary though, please let me know and I will try 
> to prepare an example.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to