[ https://issues.apache.org/jira/browse/SPARK-29336?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Guilherme Souza updated SPARK-29336: ------------------------------------ Description: Hello Spark maintainers, I was experimenting with my own implementation of the [space-efficient quantile algorithm|http://infolab.stanford.edu/~datar/courses/cs361a/papers/quantiles.pdf] in another language and I was using the Spark's one as a reference. In my analysis, I believe to have found an issue with the {{merge()}} logic. Here is some simple Scala code that reproduces the issue I've found: {code:java} var values = (1 to 100).toArray val all_quantiles = values.indices.map(i => (i+1).toDouble / values.length).toArray for (n <- 0 until 5) { var df = spark.sparkContext.makeRDD(values).toDF("value").repartition(5) val all_answers = df.stat.approxQuantile("value", all_quantiles, 0.1) val all_answered_ranks = all_answers.map(ans => values.indexOf(ans)).toArray val error = all_answered_ranks.zipWithIndex.map({ case (answer, expected) => Math.abs(expected - answer) }).toArray val max_error = error.max print(max_error + "\n") } {code} I query for all possible quantiles in a 100-element array with a desired 10% max error. In this scenario, one would expect to observe a maximum error of 10 ranks or less (10% of 100). However, the output I observe is: {noformat} 16 12 10 11 17{noformat} The variance is probably due to non-deterministic operations behind the scenes, but irrelevant to the core cause. (and sorry for my Scala, I'm not used to it) Interestingly enough, if I change from five to one partition the code works as expected and gives 10 every time. This seems to point to some problem at the [merge logic|https://github.com/apache/spark/blob/51d6ba7490eaac32fc33b8996fdf06b747884a54/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala#L153-L171] The original authors ([~clockfly] and [~cloud_fan] for what I could dig from the history) suggest the published paper is not clear on how that should be done and, honestly, I was not confident in the current approach either. I've found SPARK-21184 that reports the same problem, but it was unfortunately closed with no fix applied. In my external implementation I believe to have found a sound way to implement the merge method. [Here is my take in Rust, if relevant |https://github.com/sitegui/space-efficient-quantile/blob/188c74638c9840e5f47d6c6326b2886d47b149bc/src/modified_gk/summary.rs#L162-L218]I'd be really glad to add unit tests and contribute my implementation adapted to Scala. I'd love to hear your opinion on the matter.| Best regards was: Hello Spark maintainers, I was experimenting with my own implementation of the [space-efficient quantile algorithm|http://infolab.stanford.edu/~datar/courses/cs361a/papers/quantiles.pdf] in another language and I was using the Spark's one as a reference. In my analysis, I believe to have found an issue with the {{merge()}} logic. Here is some simple Scala code that reproduces the issue I've found: {code:java} var values = (1 to 100).toArray val all_quantiles = values.indices.map(i => (i+1).toDouble / values.length).toArray for (n <- 0 until 5) { var df = spark.sparkContext.makeRDD(values).toDF("value").repartition(5) val all_answers = df.stat.approxQuantile("value", all_quantiles, 0.1) val all_answered_ranks = all_answers.map(ans => values.indexOf(ans)).toArray val error = all_answered_ranks.zipWithIndex.map({ case (answer, expected) => Math.abs(expected - answer) }).toArray val max_error = error.max print(max_error + "\n") } {code} I query for all possible quantiles in a 100-element array with a desired 10% max error. In this scenario, one would expect to observe a maximum error of 10 ranks or less (10% of 100). However, the output I observe is: {noformat} 16 12 10 11 17{noformat} The variance is probably due to non-deterministic operations behind the scenes, but irrelevant to the core cause. Interestingly enough, if I change from five to one partition the code works as expected and gives 10 every time. This seems to point to some problem at the [merge logic|https://github.com/apache/spark/blob/51d6ba7490eaac32fc33b8996fdf06b747884a54/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala#L153-L171] The original authors ([~clockfly] and [~cloud_fan] for what I could dig from the history) suggest the published paper is not clear on how that should be done and, honestly, I was not confident in the current approach either. I've found SPARK-21184 that reports the same problem, but it was unfortunately closed with no fix applied. In my external implementation I believe to have found a sound way to implement the merge method. [Here is my take in Rust, if relevant |https://github.com/sitegui/space-efficient-quantile/blob/188c74638c9840e5f47d6c6326b2886d47b149bc/src/modified_gk/summary.rs#L162-L218]I'd be really glad to add unit tests and contribute my implementation adapted to Scala. I'd love to hear your opinion on the matter. Best regards > The implementation of QuantileSummaries.merge does not guarantee that the > relativeError will be respected > ----------------------------------------------------------------------------------------------------------- > > Key: SPARK-29336 > URL: https://issues.apache.org/jira/browse/SPARK-29336 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 2.4.3 > Reporter: Guilherme Souza > Priority: Minor > > Hello Spark maintainers, > I was experimenting with my own implementation of the [space-efficient > quantile > algorithm|http://infolab.stanford.edu/~datar/courses/cs361a/papers/quantiles.pdf] > in another language and I was using the Spark's one as a reference. > In my analysis, I believe to have found an issue with the {{merge()}} logic. > Here is some simple Scala code that reproduces the issue I've found: > > {code:java} > var values = (1 to 100).toArray > val all_quantiles = values.indices.map(i => (i+1).toDouble / > values.length).toArray > for (n <- 0 until 5) { > var df = spark.sparkContext.makeRDD(values).toDF("value").repartition(5) > val all_answers = df.stat.approxQuantile("value", all_quantiles, 0.1) > val all_answered_ranks = all_answers.map(ans => values.indexOf(ans)).toArray > val error = all_answered_ranks.zipWithIndex.map({ case (answer, expected) > => Math.abs(expected - answer) }).toArray > val max_error = error.max > print(max_error + "\n") > } > {code} > I query for all possible quantiles in a 100-element array with a desired 10% > max error. In this scenario, one would expect to observe a maximum error of > 10 ranks or less (10% of 100). However, the output I observe is: > > {noformat} > 16 > 12 > 10 > 11 > 17{noformat} > The variance is probably due to non-deterministic operations behind the > scenes, but irrelevant to the core cause. (and sorry for my Scala, I'm not > used to it) > Interestingly enough, if I change from five to one partition the code works > as expected and gives 10 every time. This seems to point to some problem at > the [merge > logic|https://github.com/apache/spark/blob/51d6ba7490eaac32fc33b8996fdf06b747884a54/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala#L153-L171] > The original authors ([~clockfly] and [~cloud_fan] for what I could dig from > the history) suggest the published paper is not clear on how that should be > done and, honestly, I was not confident in the current approach either. > I've found SPARK-21184 that reports the same problem, but it was > unfortunately closed with no fix applied. > In my external implementation I believe to have found a sound way to > implement the merge method. [Here is my take in Rust, if relevant > |https://github.com/sitegui/space-efficient-quantile/blob/188c74638c9840e5f47d6c6326b2886d47b149bc/src/modified_gk/summary.rs#L162-L218]I'd > be really glad to add unit tests and contribute my implementation adapted to > Scala. > I'd love to hear your opinion on the matter.| > Best regards > > -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org