[jira] [Commented] (SPARK-47836) Performance problem with QuantileSummaries

2024-04-12 Thread Tanel Kiis (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-47836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17836606#comment-17836606
 ] 

Tanel Kiis commented on SPARK-47836:


I would be willing to make a PR, but I do not know the right solution.
Migrating from a custom solution to the KllDoublesSketch would be the cleanest, 
but its results are nondeterministic

> Performance problem with QuantileSummaries
> --
>
> Key: SPARK-47836
> URL: https://issues.apache.org/jira/browse/SPARK-47836
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.5.1
>Reporter: Tanel Kiis
>Priority: Major
>
> SPARK-29336 caused a severe performance regression.
> In practice a partial_aggregate with several approx_percentile calls ran less 
> than hour and the final aggrergation after exchange would have taken over a 
> week.
> Simple percentile ran about the same time in the first part and the final 
> aggregate ran very quickly.
> I made a benchmark, and it reveals that the merge operation is very-very 
> slow: 
> https://github.com/tanelk/spark/commit/3b16f429a77b10003572295f42361fbfb2f3c63e
> From my experiments it looks like it is n^2 with the number of partitions 
> (number of partial aggregations to merge).
> When I reverted the changes made in this PR, then the "Only insert" and 
> "Insert & merge" were very similar.
> The cause seems to be, that compressImmut does not reduce the number samples 
> allmost at all after merges and just keeps iterating over an evergrowing list.
> I was not able to figure out how to fix the issue without just reverting the 
> PR.
> I also added a benchmark with KllDoublesSketch from the apache datasketches 
> project and it worked even better than this class before this PR.
> Only downside was that it is not-deterministic. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-47836) Performance problem with QuantileSummaries

2024-04-12 Thread Tanel Kiis (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-47836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17836605#comment-17836605
 ] 

Tanel Kiis commented on SPARK-47836:



{noformat}
QuantileSummaries:Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns)   Relative

Only insert 168171  
 8  6.0 167.6   1.0X
Insert & merge 6690   6792 
143  0.16690.3   0.0X
KllFloatsSketch insert   44 47  
 6 22.5  44.4   3.8X
KllFloatsSketch Insert & merge   55 57  
 6 18.3  54.7   3.1X
{noformat}


{code:java}
object QuantileSummariesBenchmark extends BenchmarkBase {

  def test(name: String, numValues: Int): Unit = {
runBenchmark(name) {
  val values = (1 to numValues).map(_ => Random.nextDouble())

  val benchmark = new Benchmark(name, numValues, output = output)
  benchmark.addCase("Only insert") { _: Int =>
var summaries = new QuantileSummaries(
  compressThreshold = QuantileSummaries.defaultCompressThreshold,
  relativeError = QuantileSummaries.defaultRelativeError)

for (value <- values) {
  summaries = summaries.insert(value)
}
summaries = summaries.compress()

println("Median: " + summaries.query(0.5))
  }

  benchmark.addCase("Insert & merge") { _: Int =>
// Insert values in batches of 1000 and merge the summaries.
val summaries = values.grouped(1000).map(vs => {
  var partialSummaries = new QuantileSummaries(
compressThreshold = QuantileSummaries.defaultCompressThreshold,
relativeError = QuantileSummaries.defaultRelativeError)

  for (value <- vs) {
partialSummaries = partialSummaries.insert(value)
  }
  partialSummaries.compress()
}).reduce(_.merge(_))

println("Median: " + summaries.query(0.5))
  }

  benchmark.addCase("KllFloatsSketch insert") { _: Int =>
// Insert values in batches of 1000 and merge the summaries.
val summaries = KllDoublesSketch.newHeapInstance(
  KllSketch.getKFromEpsilon(QuantileSummaries.defaultRelativeError, 
true)
)

for (value <- values) {
  summaries.update(value)
}

println("Median: " + summaries.getQuantile(0.5))
  }

  benchmark.addCase("KllFloatsSketch Insert & merge") { _: Int =>
// Insert values in batches of 1000 and merge the summaries.
val summaries = values.grouped(1000).map(vs => {
  val partialSummaries = KllDoublesSketch.newHeapInstance(
KllSketch.getKFromEpsilon(QuantileSummaries.defaultRelativeError, 
true)
  )

  for (value <- vs) {
partialSummaries.update(value)
  }

  partialSummaries
}).reduce((a, b) => {
  a.merge(b)
  a
})

println("Median: " + summaries.getQuantile(0.5))
  }

  benchmark.run()
}
  }

  override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
test("QuantileSummaries", 1_000_000)
  }
}
{code}


> Performance problem with QuantileSummaries
> --
>
> Key: SPARK-47836
> URL: https://issues.apache.org/jira/browse/SPARK-47836
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.5.1
>Reporter: Tanel Kiis
>Priority: Major
>
> SPARK-29336 caused a severe performance regression.
> In practice a partial_aggregate with several approx_percentile calls ran less 
> than hour and the final aggrergation after exchange would have taken over a 
> week.
> Simple percentile ran about the same time in the first part and the final 
> aggregate ran very quickly.
> I made a benchmark, and it reveals that the merge operation is very-very 
> slow: 
> https://github.com/tanelk/spark/commit/3b16f429a77b10003572295f42361fbfb2f3c63e
> From my experiments it looks like it is n^2 with the number of partitions 
> (number of partial aggregations to merge).
> When I reverted the changes made in this PR, then the "Only insert" and 
> "Insert & merge" were very similar.
> The cause seems to be, that compressImmut does not reduce the number samples 
> allmost at all after merges and just keeps iterating over an evergrowing list.
> I was not able to figure out how to fix the issue without just reverting the 
> PR.
> I also added a benchmark with KllDoublesSketch from the apache datasketches 
> project and it worked even better than 

[jira] [Updated] (SPARK-47836) Performance problem with QuantileSummaries

2024-04-12 Thread Tanel Kiis (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47836?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tanel Kiis updated SPARK-47836:
---
Description: 
SPARK-29336 caused a severe performance regression.
In practice a partial_aggregate with several approx_percentile calls ran less 
than hour and the final aggrergation after exchange would have taken over a 
week.
Simple percentile ran about the same time in the first part and the final 
aggregate ran very quickly.

I made a benchmark, and it reveals that the merge operation is very-very slow: 
https://github.com/tanelk/spark/commit/3b16f429a77b10003572295f42361fbfb2f3c63e
>From my experiments it looks like it is n^2 with the number of partitions 
>(number of partial aggregations to merge).
When I reverted the changes made in this PR, then the "Only insert" and "Insert 
& merge" were very similar.

The cause seems to be, that compressImmut does not reduce the number samples 
allmost at all after merges and just keeps iterating over an evergrowing list.
I was not able to figure out how to fix the issue without just reverting the PR.

I also added a benchmark with KllDoublesSketch from the apache datasketches 
project and it worked even better than this class before this PR.
Only downside was that it is not-deterministic. 

> Performance problem with QuantileSummaries
> --
>
> Key: SPARK-47836
> URL: https://issues.apache.org/jira/browse/SPARK-47836
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.5.1
>Reporter: Tanel Kiis
>Priority: Major
>
> SPARK-29336 caused a severe performance regression.
> In practice a partial_aggregate with several approx_percentile calls ran less 
> than hour and the final aggrergation after exchange would have taken over a 
> week.
> Simple percentile ran about the same time in the first part and the final 
> aggregate ran very quickly.
> I made a benchmark, and it reveals that the merge operation is very-very 
> slow: 
> https://github.com/tanelk/spark/commit/3b16f429a77b10003572295f42361fbfb2f3c63e
> From my experiments it looks like it is n^2 with the number of partitions 
> (number of partial aggregations to merge).
> When I reverted the changes made in this PR, then the "Only insert" and 
> "Insert & merge" were very similar.
> The cause seems to be, that compressImmut does not reduce the number samples 
> allmost at all after merges and just keeps iterating over an evergrowing list.
> I was not able to figure out how to fix the issue without just reverting the 
> PR.
> I also added a benchmark with KllDoublesSketch from the apache datasketches 
> project and it worked even better than this class before this PR.
> Only downside was that it is not-deterministic. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-46070) Precompile regex patterns in SparkDateTimeUtils.getZoneId

2023-11-23 Thread Tanel Kiis (Jira)
Tanel Kiis created SPARK-46070:
--

 Summary: Precompile regex patterns in SparkDateTimeUtils.getZoneId
 Key: SPARK-46070
 URL: https://issues.apache.org/jira/browse/SPARK-46070
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.5.0
Reporter: Tanel Kiis


SparkDateTimeUtils.getZoneId uses String.replaceFirst method, that internally 
does a Pattern.compile(regex). This method is called once for each dataset row 
when using functions like from_utc_timestamp.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-40664) Union in query can remove cache from the plan

2022-10-05 Thread Tanel Kiis (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-40664?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tanel Kiis updated SPARK-40664:
---
Description: 
Failing unitest:
{code}
  test("SPARK-40664: Cache with join, union and renames") {
val df1 = Seq("1", "2").toDF("a")
val df2 = Seq("2", "3").toDF("a")
  .withColumn("b", lit("b"))

val joined = df1.join(broadcast(df2), "a")
  // Messing around the column can cause some problems with cache manager
  .withColumn("tmp_b", $"b")
  .drop("b")
  .withColumnRenamed("tmp_b", "b")
  .cache()

val unioned = joined.union(joined)

assertCached(unioned, 2)
  }
{code}

After this PR the test started failing: 
https://github.com/apache/spark/pull/35214

Plan before:
{code}
== Physical Plan ==
Union
:- InMemoryTableScan [a#4, b#23]
: +- InMemoryRelation [a#4, b#23], StorageLevel(disk, memory, deserialized, 
1 replicas)
:   +- *(2) Project [a#4, b AS b#23]
:  +- *(2) BroadcastHashJoin [a#4], [a#10], Inner, BuildRight, false
: :- *(2) Project [value#1 AS a#4]
: :  +- *(2) Filter isnotnull(value#1)
: : +- *(2) LocalTableScan [value#1]
: +- BroadcastExchange 
HashedRelationBroadcastMode(List(input[0, string, true]),false), [id=#35]
:+- *(1) Project [value#7 AS a#10]
:   +- *(1) Filter isnotnull(value#7)
:  +- *(1) LocalTableScan [value#7]
+- InMemoryTableScan [a#4, b#23]
  +- InMemoryRelation [a#4, b#23], StorageLevel(disk, memory, deserialized, 
1 replicas)
+- *(2) Project [a#4, b AS b#23]
   +- *(2) BroadcastHashJoin [a#4], [a#10], Inner, BuildRight, false
  :- *(2) Project [value#1 AS a#4]
  :  +- *(2) Filter isnotnull(value#1)
  : +- *(2) LocalTableScan [value#1]
  +- BroadcastExchange 
HashedRelationBroadcastMode(List(input[0, string, true]),false), [id=#35]
 +- *(1) Project [value#7 AS a#10]
+- *(1) Filter isnotnull(value#7)
   +- *(1) LocalTableScan [value#7]

{code}

Plan after:
{code}
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Union
   :- Project [a#4, b AS b#23]
   :  +- BroadcastHashJoin [a#4], [a#10], Inner, BuildRight, false
   : :- Project [value#1 AS a#4]
   : :  +- Filter isnotnull(value#1)
   : : +- LocalTableScan [value#1]
   : +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, 
true]),false), [id=#115]
   :+- Project [value#7 AS a#10]
   :   +- Filter isnotnull(value#7)
   :  +- LocalTableScan [value#7]
   +- Project [a#4, b AS b#39]
  +- BroadcastHashJoin [a#4], [a#10], Inner, BuildRight, false
 :- Project [value#36 AS a#4]
 :  +- Filter isnotnull(value#36)
 : +- LocalTableScan [value#36]
 +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, 
true]),false), [id=#118]
+- Project [value#37 AS a#10]
   +- Filter isnotnull(value#37)
  +- LocalTableScan [value#37]

{code}

(The InMemoryTableScan is missing)

  was:
Failing unitest:
{code}
  test("SPARK-40664: Cache with join, union and renames") {
val df1 = Seq("1", "2").toDF("a")
val df2 = Seq("2", "3").toDF("a")
  .withColumn("b", lit("b"))

val joined = df1.join(broadcast(df2), "a")
  // Messing around the column can cause some problems with cache manager
  .withColumn("tmp_b", $"b")
  .drop("b")
  .withColumnRenamed("tmp_b", "b")
  .cache()

val unioned = joined.union(joined)

assertCached(unioned, 2)
  }
{code}

After this PR the test started failing: 
https://github.com/apache/spark/pull/35214

Plan before:
{code}
Union
:- InMemoryTableScan [a#4, b#23]
: +- InMemoryRelation [a#4, b#23], StorageLevel(disk, memory, deserialized, 
1 replicas)
:   +- *(2) Project [a#4, b AS b#23]
:  +- *(2) BroadcastHashJoin [a#4], [a#10], Inner, BuildRight, false
: :- *(2) Project [value#1 AS a#4]
: :  +- *(2) Filter isnotnull(value#1)
: : +- *(2) LocalTableScan [value#1]
: +- BroadcastExchange 
HashedRelationBroadcastMode(List(input[0, string, true]),false), [id=#35]
:+- *(1) Project [value#7 AS a#10]
:   +- *(1) Filter isnotnull(value#7)
:  +- *(1) LocalTableScan [value#7]
+- InMemoryTableScan [a#4, b#23]
  +- InMemoryRelation [a#4, b#23], StorageLevel(disk, memory, deserialized, 
1 replicas)
+- *(2) Project [a#4, b AS b#23]
   +- *(2) BroadcastHashJoin [a#4], [a#10], Inner, BuildRight, false
  :- *(2) Project [value#1 AS a#4]
  :  +- *(2) Filter isnotnull(value#1)

[jira] [Updated] (SPARK-40664) Union in query can remove cache from the plan

2022-10-05 Thread Tanel Kiis (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-40664?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tanel Kiis updated SPARK-40664:
---
Description: 
Failing unitest:
{code}
  test("SPARK-40664: Cache with join, union and renames") {
val df1 = Seq("1", "2").toDF("a")
val df2 = Seq("2", "3").toDF("a")
  .withColumn("b", lit("b"))

val joined = df1.join(broadcast(df2), "a")
  // Messing around the column can cause some problems with cache manager
  .withColumn("tmp_b", $"b")
  .drop("b")
  .withColumnRenamed("tmp_b", "b")
  .cache()

val unioned = joined.union(joined)

assertCached(unioned, 2)
  }
{code}

After this PR the test started failing: 
https://github.com/apache/spark/pull/35214

Plan before:
{code}
Union
:- InMemoryTableScan [a#4, b#23]
: +- InMemoryRelation [a#4, b#23], StorageLevel(disk, memory, deserialized, 
1 replicas)
:   +- *(2) Project [a#4, b AS b#23]
:  +- *(2) BroadcastHashJoin [a#4], [a#10], Inner, BuildRight, false
: :- *(2) Project [value#1 AS a#4]
: :  +- *(2) Filter isnotnull(value#1)
: : +- *(2) LocalTableScan [value#1]
: +- BroadcastExchange 
HashedRelationBroadcastMode(List(input[0, string, true]),false), [id=#35]
:+- *(1) Project [value#7 AS a#10]
:   +- *(1) Filter isnotnull(value#7)
:  +- *(1) LocalTableScan [value#7]
+- InMemoryTableScan [a#4, b#23]
  +- InMemoryRelation [a#4, b#23], StorageLevel(disk, memory, deserialized, 
1 replicas)
+- *(2) Project [a#4, b AS b#23]
   +- *(2) BroadcastHashJoin [a#4], [a#10], Inner, BuildRight, false
  :- *(2) Project [value#1 AS a#4]
  :  +- *(2) Filter isnotnull(value#1)
  : +- *(2) LocalTableScan [value#1]
  +- BroadcastExchange 
HashedRelationBroadcastMode(List(input[0, string, true]),false), [id=#35]
 +- *(1) Project [value#7 AS a#10]
+- *(1) Filter isnotnull(value#7)
   +- *(1) LocalTableScan [value#7]
{code}

Plan after:
{code}
AdaptiveSparkPlan isFinalPlan=false
+- Union
   :- Project [a#4, b AS b#23]
   :  +- BroadcastHashJoin [a#4], [a#10], Inner, BuildRight, false
   : :- Project [value#1 AS a#4]
   : :  +- Filter isnotnull(value#1)
   : : +- LocalTableScan [value#1]
   : +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, 
true]),false), [id=#115]
   :+- Project [value#7 AS a#10]
   :   +- Filter isnotnull(value#7)
   :  +- LocalTableScan [value#7]
   +- Project [a#4, b AS b#39]
  +- BroadcastHashJoin [a#4], [a#10], Inner, BuildRight, false
 :- Project [value#36 AS a#4]
 :  +- Filter isnotnull(value#36)
 : +- LocalTableScan [value#36]
 +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, 
true]),false), [id=#118]
+- Project [value#37 AS a#10]
   +- Filter isnotnull(value#37)
  +- LocalTableScan [value#37]
{code}

(The InMemoryTableScan is missing)

> Union in query can remove cache from the plan
> -
>
> Key: SPARK-40664
> URL: https://issues.apache.org/jira/browse/SPARK-40664
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.3.0
>Reporter: Tanel Kiis
>Priority: Major
>
> Failing unitest:
> {code}
>   test("SPARK-40664: Cache with join, union and renames") {
> val df1 = Seq("1", "2").toDF("a")
> val df2 = Seq("2", "3").toDF("a")
>   .withColumn("b", lit("b"))
> val joined = df1.join(broadcast(df2), "a")
>   // Messing around the column can cause some problems with cache manager
>   .withColumn("tmp_b", $"b")
>   .drop("b")
>   .withColumnRenamed("tmp_b", "b")
>   .cache()
> val unioned = joined.union(joined)
> assertCached(unioned, 2)
>   }
> {code}
> After this PR the test started failing: 
> https://github.com/apache/spark/pull/35214
> Plan before:
> {code}
> Union
> :- InMemoryTableScan [a#4, b#23]
> : +- InMemoryRelation [a#4, b#23], StorageLevel(disk, memory, 
> deserialized, 1 replicas)
> :   +- *(2) Project [a#4, b AS b#23]
> :  +- *(2) BroadcastHashJoin [a#4], [a#10], Inner, BuildRight, 
> false
> : :- *(2) Project [value#1 AS a#4]
> : :  +- *(2) Filter isnotnull(value#1)
> : : +- *(2) LocalTableScan [value#1]
> : +- BroadcastExchange 
> HashedRelationBroadcastMode(List(input[0, string, true]),false), [id=#35]
> :+- *(1) Project [value#7 AS a#10]
> :   +- *(1) Filter isnotnull(value#7)
> : 

[jira] [Commented] (SPARK-40664) Union in query can remove cache from the plan

2022-10-05 Thread Tanel Kiis (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-40664?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17612881#comment-17612881
 ] 

Tanel Kiis commented on SPARK-40664:


I do not think that https://github.com/apache/spark/pull/35214 has a bug, but 
it instead revealed an existing bug in cache management.

> Union in query can remove cache from the plan
> -
>
> Key: SPARK-40664
> URL: https://issues.apache.org/jira/browse/SPARK-40664
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.3.0
>Reporter: Tanel Kiis
>Priority: Major
>
> Failing unitest:
> {code}
>   test("SPARK-40664: Cache with join, union and renames") {
> val df1 = Seq("1", "2").toDF("a")
> val df2 = Seq("2", "3").toDF("a")
>   .withColumn("b", lit("b"))
> val joined = df1.join(broadcast(df2), "a")
>   // Messing around the column can cause some problems with cache manager
>   .withColumn("tmp_b", $"b")
>   .drop("b")
>   .withColumnRenamed("tmp_b", "b")
>   .cache()
> val unioned = joined.union(joined)
> assertCached(unioned, 2)
>   }
> {code}
> After this PR the test started failing: 
> https://github.com/apache/spark/pull/35214
> Plan before:
> {code}
> Union
> :- InMemoryTableScan [a#4, b#23]
> : +- InMemoryRelation [a#4, b#23], StorageLevel(disk, memory, 
> deserialized, 1 replicas)
> :   +- *(2) Project [a#4, b AS b#23]
> :  +- *(2) BroadcastHashJoin [a#4], [a#10], Inner, BuildRight, 
> false
> : :- *(2) Project [value#1 AS a#4]
> : :  +- *(2) Filter isnotnull(value#1)
> : : +- *(2) LocalTableScan [value#1]
> : +- BroadcastExchange 
> HashedRelationBroadcastMode(List(input[0, string, true]),false), [id=#35]
> :+- *(1) Project [value#7 AS a#10]
> :   +- *(1) Filter isnotnull(value#7)
> :  +- *(1) LocalTableScan [value#7]
> +- InMemoryTableScan [a#4, b#23]
>   +- InMemoryRelation [a#4, b#23], StorageLevel(disk, memory, 
> deserialized, 1 replicas)
> +- *(2) Project [a#4, b AS b#23]
>+- *(2) BroadcastHashJoin [a#4], [a#10], Inner, BuildRight, 
> false
>   :- *(2) Project [value#1 AS a#4]
>   :  +- *(2) Filter isnotnull(value#1)
>   : +- *(2) LocalTableScan [value#1]
>   +- BroadcastExchange 
> HashedRelationBroadcastMode(List(input[0, string, true]),false), [id=#35]
>  +- *(1) Project [value#7 AS a#10]
> +- *(1) Filter isnotnull(value#7)
>+- *(1) LocalTableScan [value#7]
> {code}
> Plan after:
> {code}
> AdaptiveSparkPlan isFinalPlan=false
> +- Union
>:- Project [a#4, b AS b#23]
>:  +- BroadcastHashJoin [a#4], [a#10], Inner, BuildRight, false
>: :- Project [value#1 AS a#4]
>: :  +- Filter isnotnull(value#1)
>: : +- LocalTableScan [value#1]
>: +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, 
> string, true]),false), [id=#115]
>:+- Project [value#7 AS a#10]
>:   +- Filter isnotnull(value#7)
>:  +- LocalTableScan [value#7]
>+- Project [a#4, b AS b#39]
>   +- BroadcastHashJoin [a#4], [a#10], Inner, BuildRight, false
>  :- Project [value#36 AS a#4]
>  :  +- Filter isnotnull(value#36)
>  : +- LocalTableScan [value#36]
>  +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, 
> string, true]),false), [id=#118]
> +- Project [value#37 AS a#10]
>+- Filter isnotnull(value#37)
>   +- LocalTableScan [value#37]
> {code}
> (The InMemoryTableScan is missing)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-40664) Union in query can remove cache from the plan

2022-10-05 Thread Tanel Kiis (Jira)
Tanel Kiis created SPARK-40664:
--

 Summary: Union in query can remove cache from the plan
 Key: SPARK-40664
 URL: https://issues.apache.org/jira/browse/SPARK-40664
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.3.0
Reporter: Tanel Kiis






--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-38485) Non-deterministic UDF executed multiple times when combined with withField

2022-04-17 Thread Tanel Kiis (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-38485?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17523363#comment-17523363
 ] 

Tanel Kiis commented on SPARK-38485:


Is there then even any point in having non-deterministic methods in spark? Some 
optimizations are disabled for them do avoid similar situations.

> Non-deterministic UDF executed multiple times when combined with withField
> --
>
> Key: SPARK-38485
> URL: https://issues.apache.org/jira/browse/SPARK-38485
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.3.0
>Reporter: Tanel Kiis
>Priority: Major
>  Labels: Correctness
>
> When adding fields to a result of a non-deterministic UDF, that returns a 
> struct, then that UDF is executed multiple times (once per field) for each 
> row.
> In this UT df1 passes, but df2 fails with something like:
> "279751724 did not equal -1023188908"
> {code}
>   test("SPARK-X: non-deterministic UDF should be called once when adding 
> fields") {
> val nondeterministicUDF = udf((s: Int) => {
>   val r = Random.nextInt()
>   // Both values should be the same
>   GroupByKey(r, r)
> }).asNondeterministic()
> val df1 = spark.range(5).select(nondeterministicUDF($"id"))
> df1.collect().foreach {
>   row => assert(row.getStruct(0).getInt(0) == row.getStruct(0).getInt(1))
> }
> val df2 = 
> spark.range(5).select(nondeterministicUDF($"id").withField("new", lit(7)))
> df2.collect().foreach {
>   row => assert(row.getStruct(0).getInt(0) == row.getStruct(0).getInt(1))
> }
>   }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-38485) Non-deterministic UDF executed multiple times when combined with withField

2022-03-09 Thread Tanel Kiis (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-38485?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tanel Kiis updated SPARK-38485:
---
Description: 
When adding fields to a result of a non-deterministic UDF, that returns a 
struct, then that UDF is executed multiple times (once per field) for each row.

In this UT df1 passes, but df2 fails with something like:
"279751724 did not equal -1023188908"

{code}
  test("SPARK-X: non-deterministic UDF should be called once when adding 
fields") {
val nondeterministicUDF = udf((s: Int) => {
  val r = Random.nextInt()
  // Both values should be the same
  GroupByKey(r, r)
}).asNondeterministic()

val df1 = spark.range(5).select(nondeterministicUDF($"id"))
df1.collect().foreach {
  row => assert(row.getStruct(0).getInt(0) == row.getStruct(0).getInt(1))
}

val df2 = spark.range(5).select(nondeterministicUDF($"id").withField("new", 
lit(7)))
df2.collect().foreach {
  row => assert(row.getStruct(0).getInt(0) == row.getStruct(0).getInt(1))
}
  }
{code}

  was:
When adding fields to a result of a non-deterministic UDF, that returns a 
struct, then that UDF is executed multiple times (once per field) for each row.

In this UT df1 passes, but df2 fails with something like:
"279751724 did not equal -1023188908"

{code}
  test("SPARK-X: non-deterministic UDF should be called once when adding 
fields") {
val nondeterministicUDF = udf((s: Int) => {
  val r = Random.nextInt()
  // Both values should be the same
  GroupByKey(r, r)
}).asNondeterministic()

val df1 = spark.range(5).select(
  nondeterministicUDF($"id").as("struct"))
df1.collect().foreach {
  row => assert(row.getStruct(0).getInt(0) == row.getStruct(0).getInt(1))
}

val df2 = spark.range(5).select(
  nondeterministicUDF($"id").withField("new", lit(7)).as("struct"))
df2.collect().foreach {
  row => assert(row.getStruct(0).getInt(0) == row.getStruct(0).getInt(1))
}
  }
{code}


> Non-deterministic UDF executed multiple times when combined with withField
> --
>
> Key: SPARK-38485
> URL: https://issues.apache.org/jira/browse/SPARK-38485
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.3.0
>Reporter: Tanel Kiis
>Priority: Major
>  Labels: Correctness
>
> When adding fields to a result of a non-deterministic UDF, that returns a 
> struct, then that UDF is executed multiple times (once per field) for each 
> row.
> In this UT df1 passes, but df2 fails with something like:
> "279751724 did not equal -1023188908"
> {code}
>   test("SPARK-X: non-deterministic UDF should be called once when adding 
> fields") {
> val nondeterministicUDF = udf((s: Int) => {
>   val r = Random.nextInt()
>   // Both values should be the same
>   GroupByKey(r, r)
> }).asNondeterministic()
> val df1 = spark.range(5).select(nondeterministicUDF($"id"))
> df1.collect().foreach {
>   row => assert(row.getStruct(0).getInt(0) == row.getStruct(0).getInt(1))
> }
> val df2 = 
> spark.range(5).select(nondeterministicUDF($"id").withField("new", lit(7)))
> df2.collect().foreach {
>   row => assert(row.getStruct(0).getInt(0) == row.getStruct(0).getInt(1))
> }
>   }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-38485) Non-deterministic UDF executed multiple times when combined with withField

2022-03-09 Thread Tanel Kiis (Jira)
Tanel Kiis created SPARK-38485:
--

 Summary: Non-deterministic UDF executed multiple times when 
combined with withField
 Key: SPARK-38485
 URL: https://issues.apache.org/jira/browse/SPARK-38485
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.3.0
Reporter: Tanel Kiis


When adding fields to a result of a non-deterministic UDF, that returns a 
struct, then that UDF is executed multiple times (once per field) for each row.

In this UT df1 passes, but df2 fails with something like:
"279751724 did not equal -1023188908"

{code}
  test("SPARK-X: non-deterministic UDF should be called once when adding 
fields") {
val nondeterministicUDF = udf((s: Int) => {
  val r = Random.nextInt()
  // Both values should be the same
  GroupByKey(r, r)
}).asNondeterministic()

val df1 = spark.range(5).select(
  nondeterministicUDF($"id").as("struct"))
df1.collect().foreach {
  row => assert(row.getStruct(0).getInt(0) == row.getStruct(0).getInt(1))
}

val df2 = spark.range(5).select(
  nondeterministicUDF($"id").withField("new", lit(7)).as("struct"))
df2.collect().foreach {
  row => assert(row.getStruct(0).getInt(0) == row.getStruct(0).getInt(1))
}
  }
{code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-38282) Avoid duplicating complex partitioning expressions

2022-02-21 Thread Tanel Kiis (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-38282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17495899#comment-17495899
 ] 

Tanel Kiis commented on SPARK-38282:


[~cloud_fan], any ideas how to improve this? 
I could submit a PR, but I'm not sure, what would be the best way here.

> Avoid duplicating complex partitioning expressions
> --
>
> Key: SPARK-38282
> URL: https://issues.apache.org/jira/browse/SPARK-38282
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.3.0
>Reporter: Tanel Kiis
>Priority: Major
>
> Spark will duplicate all non-trivial expressions in Window.partitionBy, that 
> will result in duplicate exchanges and WindowExec nodes.
> An example unit test:
> {code}
>   test("SPARK-38282: Avoid duplicating complex partitioning expressions") {
> val group = functions.col("id") % 2
> val min = functions.min("id").over(Window.partitionBy(group))
> val max = functions.max("id").over(Window.partitionBy(group))
> val df1 = spark.range(1, 4)
>   .withColumn("ratio", max / min)
> val df2 = spark.range(1, 4)
>   .withColumn("min", min)
>   .withColumn("max", max)
>   .select(col("id"), (col("max") / col("min")).as("ratio"))
> Seq(df1, df2).foreach { df =>
>   checkAnswer(
> df,
> Seq(Row(1L, 3.0), Row(2L, 1.0), Row(3L, 3.0)))
>   val windows = collect(df.queryExecution.executedPlan) {
> case w: WindowExec => w
>   }
>   assert(windows.size == 1)
> }
>   }
> {code}
> The query plan for this (_w0#5L and _w1#6L are duplicates):
> {code}
> Window [min(id#2L) windowspecdefinition(_w1#6L, 
> specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) 
> AS _we1#8L], [_w1#6L]
>+- *(4) Sort [_w1#6L ASC NULLS FIRST], false, 0
>   +- AQEShuffleRead coalesced
>  +- ShuffleQueryStage 1
> +- Exchange hashpartitioning(_w1#6L, 5), ENSURE_REQUIREMENTS, 
> [id=#256]
>+- *(3) Project [id#2L, _w1#6L, _we0#7L]
>   +- Window [max(id#2L) windowspecdefinition(_w0#5L, 
> specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) 
> AS _we0#7L], [_w0#5L]
>  +- *(2) Sort [_w0#5L ASC NULLS FIRST], false, 0
> +- AQEShuffleRead coalesced
>+- ShuffleQueryStage 0
>   +- Exchange hashpartitioning(_w0#5L, 5), 
> ENSURE_REQUIREMENTS, [id=#203]
>  +- *(1) Project [id#2L, (id#2L % 2) AS 
> _w0#5L, (id#2L % 2) AS _w1#6L]
> +- *(1) Range (1, 4, step=1, splits=2)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-38282) Avoid duplicating complex partitioning expressions

2022-02-21 Thread Tanel Kiis (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-38282?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tanel Kiis updated SPARK-38282:
---
Description: 
Spark will duplicate all non-trivial expressions in Window.partitionBy, that 
will result in duplicate exchanges and WindowExec nodes.

An example unit test:
{code}
  test("SPARK-38282: Avoid duplicating complex partitioning expressions") {
val group = functions.col("id") % 2
val min = functions.min("id").over(Window.partitionBy(group))
val max = functions.max("id").over(Window.partitionBy(group))

val df1 = spark.range(1, 4)
  .withColumn("ratio", max / min)

val df2 = spark.range(1, 4)
  .withColumn("min", min)
  .withColumn("max", max)
  .select(col("id"), (col("max") / col("min")).as("ratio"))

Seq(df1, df2).foreach { df =>
  checkAnswer(
df,
Seq(Row(1L, 3.0), Row(2L, 1.0), Row(3L, 3.0)))

  val windows = collect(df.queryExecution.executedPlan) {
case w: WindowExec => w
  }
  assert(windows.size == 1)
}
  }
{code}

The query plan for this (_w0#5L and _w1#6L are duplicates):
{code}
Window [min(id#2L) windowspecdefinition(_w1#6L, specifiedwindowframe(RowFrame, 
unboundedpreceding$(), unboundedfollowing$())) AS _we1#8L], [_w1#6L]
   +- *(4) Sort [_w1#6L ASC NULLS FIRST], false, 0
  +- AQEShuffleRead coalesced
 +- ShuffleQueryStage 1
+- Exchange hashpartitioning(_w1#6L, 5), ENSURE_REQUIREMENTS, 
[id=#256]
   +- *(3) Project [id#2L, _w1#6L, _we0#7L]
  +- Window [max(id#2L) windowspecdefinition(_w0#5L, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) 
AS _we0#7L], [_w0#5L]
 +- *(2) Sort [_w0#5L ASC NULLS FIRST], false, 0
+- AQEShuffleRead coalesced
   +- ShuffleQueryStage 0
  +- Exchange hashpartitioning(_w0#5L, 5), 
ENSURE_REQUIREMENTS, [id=#203]
 +- *(1) Project [id#2L, (id#2L % 2) AS _w0#5L, 
(id#2L % 2) AS _w1#6L]
+- *(1) Range (1, 4, step=1, splits=2)
{code}

  was:

{code}
  test("SPARK-X: Avoid duplicating complex partitioning expressions") {
val group = functions.col("id") % 2
val min = functions.min("id").over(Window.partitionBy(group))
val max = functions.max("id").over(Window.partitionBy(group))

val df1 = spark.range(1, 4)
  .withColumn("ratio", max / min)

val df2 = spark.range(1, 4)
  .withColumn("min", min)
  .withColumn("max", max)
  .select(col("id"), (col("max") / col("min")).as("ratio"))

Seq(df1, df2).foreach { df =>
  checkAnswer(
df,
Seq(Row(1L, 3.0), Row(2L, 1.0), Row(3L, 3.0)))

  val windows = collect(df.queryExecution.executedPlan) {
case w: WindowExec => w
  }
  assert(windows.size == 1)
}
  }
{code}


> Avoid duplicating complex partitioning expressions
> --
>
> Key: SPARK-38282
> URL: https://issues.apache.org/jira/browse/SPARK-38282
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.3.0
>Reporter: Tanel Kiis
>Priority: Major
>
> Spark will duplicate all non-trivial expressions in Window.partitionBy, that 
> will result in duplicate exchanges and WindowExec nodes.
> An example unit test:
> {code}
>   test("SPARK-38282: Avoid duplicating complex partitioning expressions") {
> val group = functions.col("id") % 2
> val min = functions.min("id").over(Window.partitionBy(group))
> val max = functions.max("id").over(Window.partitionBy(group))
> val df1 = spark.range(1, 4)
>   .withColumn("ratio", max / min)
> val df2 = spark.range(1, 4)
>   .withColumn("min", min)
>   .withColumn("max", max)
>   .select(col("id"), (col("max") / col("min")).as("ratio"))
> Seq(df1, df2).foreach { df =>
>   checkAnswer(
> df,
> Seq(Row(1L, 3.0), Row(2L, 1.0), Row(3L, 3.0)))
>   val windows = collect(df.queryExecution.executedPlan) {
> case w: WindowExec => w
>   }
>   assert(windows.size == 1)
> }
>   }
> {code}
> The query plan for this (_w0#5L and _w1#6L are duplicates):
> {code}
> Window [min(id#2L) windowspecdefinition(_w1#6L, 
> specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) 
> AS _we1#8L], [_w1#6L]
>+- *(4) Sort [_w1#6L ASC NULLS FIRST], false, 0
>   +- AQEShuffleRead coalesced
>  +- ShuffleQueryStage 1
> +- Exchange hashpartitioning(_w1#6L, 5), ENSURE_REQUIREMENTS, 
> [id=#256]
>+- *(3) Project [id#2L, _w1#6L, _we0#7L]
>   +- Window [max(id#2L) windowspecdefinition(_w0#5L, 
> specifiedwindowframe(RowFrame, unboundedpreceding$(), 

[jira] [Updated] (SPARK-38282) Avoid duplicating complex partitioning expressions

2022-02-21 Thread Tanel Kiis (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-38282?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tanel Kiis updated SPARK-38282:
---
Description: 

{code}
  test("SPARK-X: Avoid duplicating complex partitioning expressions") {
val group = functions.col("id") % 2
val min = functions.min("id").over(Window.partitionBy(group))
val max = functions.max("id").over(Window.partitionBy(group))

val df1 = spark.range(1, 4)
  .withColumn("ratio", max / min)

val df2 = spark.range(1, 4)
  .withColumn("min", min)
  .withColumn("max", max)
  .select(col("id"), (col("max") / col("min")).as("ratio"))

Seq(df1, df2).foreach { df =>
  checkAnswer(
df,
Seq(Row(1L, 3.0), Row(2L, 1.0), Row(3L, 3.0)))

  val windows = collect(df.queryExecution.executedPlan) {
case w: WindowExec => w
  }
  assert(windows.size == 1)
}
  }
{code}

  was:

{code}

  test("SPARK-X: Avoid duplicating complex partitioning expressions") {
val group = functions.col("id") % 2
val min = functions.min("id").over(Window.partitionBy(group))
val max = functions.max("id").over(Window.partitionBy(group))

val df1 = spark.range(1, 4)
  .withColumn("ratio", max / min)

val df2 = spark.range(1, 4)
  .withColumn("min", min)
  .withColumn("max", max)
  .select(col("id"), (col("max") / col("min")).as("ratio"))

Seq(df1, df2).foreach { df =>
  checkAnswer(
df,
Seq(Row(1L, 3.0), Row(2L, 1.0), Row(3L, 3.0)))

  val windows = collect(df.queryExecution.executedPlan) {
case w: WindowExec => w
  }
  assert(windows.size == 1)
}
  }
{code}


> Avoid duplicating complex partitioning expressions
> --
>
> Key: SPARK-38282
> URL: https://issues.apache.org/jira/browse/SPARK-38282
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.3.0
>Reporter: Tanel Kiis
>Priority: Major
>
> {code}
>   test("SPARK-X: Avoid duplicating complex partitioning expressions") {
> val group = functions.col("id") % 2
> val min = functions.min("id").over(Window.partitionBy(group))
> val max = functions.max("id").over(Window.partitionBy(group))
> val df1 = spark.range(1, 4)
>   .withColumn("ratio", max / min)
> val df2 = spark.range(1, 4)
>   .withColumn("min", min)
>   .withColumn("max", max)
>   .select(col("id"), (col("max") / col("min")).as("ratio"))
> Seq(df1, df2).foreach { df =>
>   checkAnswer(
> df,
> Seq(Row(1L, 3.0), Row(2L, 1.0), Row(3L, 3.0)))
>   val windows = collect(df.queryExecution.executedPlan) {
> case w: WindowExec => w
>   }
>   assert(windows.size == 1)
> }
>   }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-38282) Avoid duplicating complex partitioning expressions

2022-02-21 Thread Tanel Kiis (Jira)
Tanel Kiis created SPARK-38282:
--

 Summary: Avoid duplicating complex partitioning expressions
 Key: SPARK-38282
 URL: https://issues.apache.org/jira/browse/SPARK-38282
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.3.0
Reporter: Tanel Kiis



{code}

  test("SPARK-X: Avoid duplicating complex partitioning expressions") {
val group = functions.col("id") % 2
val min = functions.min("id").over(Window.partitionBy(group))
val max = functions.max("id").over(Window.partitionBy(group))

val df1 = spark.range(1, 4)
  .withColumn("ratio", max / min)

val df2 = spark.range(1, 4)
  .withColumn("min", min)
  .withColumn("max", max)
  .select(col("id"), (col("max") / col("min")).as("ratio"))

Seq(df1, df2).foreach { df =>
  checkAnswer(
df,
Seq(Row(1L, 3.0), Row(2L, 1.0), Row(3L, 3.0)))

  val windows = collect(df.queryExecution.executedPlan) {
case w: WindowExec => w
  }
  assert(windows.size == 1)
}
  }
{code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-37538) Replace single projection Expand with Project

2021-12-03 Thread Tanel Kiis (Jira)
Tanel Kiis created SPARK-37538:
--

 Summary: Replace single projection Expand with Project
 Key: SPARK-37538
 URL: https://issues.apache.org/jira/browse/SPARK-37538
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.3.0
Reporter: Tanel Kiis






--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-37487) CollectMetrics is executed twice if it is followed by a sort

2021-11-29 Thread Tanel Kiis (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-37487?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tanel Kiis updated SPARK-37487:
---
Description: 
It is best examplified by this new UT in DataFrameCallbackSuite:
{code}
  test("SPARK-37487: get observable metrics with sort by callback") {
val df = spark.range(100)
  .observe(
name = "my_event",
min($"id").as("min_val"),
max($"id").as("max_val"),
// Test unresolved alias
sum($"id"),
count(when($"id" % 2 === 0, 1)).as("num_even"))
  .observe(
name = "other_event",
avg($"id").cast("int").as("avg_val"))
  .sort($"id".desc)

validateObservedMetrics(df)
  }
{code}

The count and sum aggregate report twice the number of rows:
{code}
[info] - SPARK-37487: get observable metrics with sort by callback *** FAILED 
*** (169 milliseconds)
[info]   [0,99,9900,100] did not equal [0,99,4950,50] 
(DataFrameCallbackSuite.scala:342)
[info]   org.scalatest.exceptions.TestFailedException:
[info]   at 
org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
[info]   at 
org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
[info]   at 
org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231)
[info]   at 
org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295)
[info]   at 
org.apache.spark.sql.util.DataFrameCallbackSuite.checkMetrics$1(DataFrameCallbackSuite.scala:342)
[info]   at 
org.apache.spark.sql.util.DataFrameCallbackSuite.validateObservedMetrics(DataFrameCallbackSuite.scala:350)
[info]   at 
org.apache.spark.sql.util.DataFrameCallbackSuite.$anonfun$new$21(DataFrameCallbackSuite.scala:324)
[info]   at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
[info]   at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
[info]   at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
[info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:20)
{code}

I could not figure out how this happes. Hopefully the UT can help with debugging

  was:
It is best examplified by this new UT in DataFrameCallbackSuite:
{code}
  test("SPARK-X: get observable metrics with sort by callback") {
val df = spark.range(100)
  .observe(
name = "my_event",
min($"id").as("min_val"),
max($"id").as("max_val"),
// Test unresolved alias
sum($"id"),
count(when($"id" % 2 === 0, 1)).as("num_even"))
  .observe(
name = "other_event",
avg($"id").cast("int").as("avg_val"))
  .sort($"id".desc)

validateObservedMetrics(df)
  }
{code}

The count and sum aggregate report twice the number of rows:
{code}
[info] - SPARK-X: get observable metrics with sort by callback *** FAILED 
*** (169 milliseconds)
[info]   [0,99,9900,100] did not equal [0,99,4950,50] 
(DataFrameCallbackSuite.scala:342)
[info]   org.scalatest.exceptions.TestFailedException:
[info]   at 
org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
[info]   at 
org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
[info]   at 
org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231)
[info]   at 
org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295)
[info]   at 
org.apache.spark.sql.util.DataFrameCallbackSuite.checkMetrics$1(DataFrameCallbackSuite.scala:342)
[info]   at 
org.apache.spark.sql.util.DataFrameCallbackSuite.validateObservedMetrics(DataFrameCallbackSuite.scala:350)
[info]   at 
org.apache.spark.sql.util.DataFrameCallbackSuite.$anonfun$new$21(DataFrameCallbackSuite.scala:324)
[info]   at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
[info]   at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
[info]   at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
[info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:20)
{code}

I could not figure out how this happes. Hopefully the UT can help with debugging


> CollectMetrics is executed twice if it is followed by a sort
> 
>
> Key: SPARK-37487
> URL: https://issues.apache.org/jira/browse/SPARK-37487
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.3.0
>Reporter: Tanel Kiis
>Priority: Major
>  Labels: correctness
>
> It is best examplified by this new UT in DataFrameCallbackSuite:
> {code}
>   test("SPARK-37487: get observable metrics with sort by callback") {
> val df = spark.range(100)

[jira] [Updated] (SPARK-37487) CollectMetrics is executed twice if it is followed by a sort

2021-11-29 Thread Tanel Kiis (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-37487?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tanel Kiis updated SPARK-37487:
---
Labels: correctness  (was: )

> CollectMetrics is executed twice if it is followed by a sort
> 
>
> Key: SPARK-37487
> URL: https://issues.apache.org/jira/browse/SPARK-37487
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.3.0
>Reporter: Tanel Kiis
>Priority: Major
>  Labels: correctness
>
> It is best examplified by this new UT in DataFrameCallbackSuite:
> {code}
>   test("SPARK-X: get observable metrics with sort by callback") {
> val df = spark.range(100)
>   .observe(
> name = "my_event",
> min($"id").as("min_val"),
> max($"id").as("max_val"),
> // Test unresolved alias
> sum($"id"),
> count(when($"id" % 2 === 0, 1)).as("num_even"))
>   .observe(
> name = "other_event",
> avg($"id").cast("int").as("avg_val"))
>   .sort($"id".desc)
> validateObservedMetrics(df)
>   }
> {code}
> The count and sum aggregate report twice the number of rows:
> {code}
> [info] - SPARK-X: get observable metrics with sort by callback *** FAILED 
> *** (169 milliseconds)
> [info]   [0,99,9900,100] did not equal [0,99,4950,50] 
> (DataFrameCallbackSuite.scala:342)
> [info]   org.scalatest.exceptions.TestFailedException:
> [info]   at 
> org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
> [info]   at 
> org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
> [info]   at 
> org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231)
> [info]   at 
> org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295)
> [info]   at 
> org.apache.spark.sql.util.DataFrameCallbackSuite.checkMetrics$1(DataFrameCallbackSuite.scala:342)
> [info]   at 
> org.apache.spark.sql.util.DataFrameCallbackSuite.validateObservedMetrics(DataFrameCallbackSuite.scala:350)
> [info]   at 
> org.apache.spark.sql.util.DataFrameCallbackSuite.$anonfun$new$21(DataFrameCallbackSuite.scala:324)
> [info]   at 
> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
> [info]   at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
> [info]   at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
> [info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
> [info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
> [info]   at org.scalatest.Transformer.apply(Transformer.scala:20)
> {code}
> I could not figure out how this happes. Hopefully the UT can help with 
> debugging



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-37487) CollectMetrics is executed twice if it is followed by a sort

2021-11-29 Thread Tanel Kiis (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-37487?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tanel Kiis updated SPARK-37487:
---
Summary: CollectMetrics is executed twice if it is followed by a sort  
(was: CollectMetrics is executed twice if it is followed by an sort)

> CollectMetrics is executed twice if it is followed by a sort
> 
>
> Key: SPARK-37487
> URL: https://issues.apache.org/jira/browse/SPARK-37487
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.3.0
>Reporter: Tanel Kiis
>Priority: Major
>
> It is best examplified by this new UT in DataFrameCallbackSuite:
> {code}
>   test("SPARK-X: get observable metrics with sort by callback") {
> val df = spark.range(100)
>   .observe(
> name = "my_event",
> min($"id").as("min_val"),
> max($"id").as("max_val"),
> // Test unresolved alias
> sum($"id"),
> count(when($"id" % 2 === 0, 1)).as("num_even"))
>   .observe(
> name = "other_event",
> avg($"id").cast("int").as("avg_val"))
>   .sort($"id".desc)
> validateObservedMetrics(df)
>   }
> {code}
> The count and sum aggregate report twice the number of rows:
> {code}
> [info] - SPARK-X: get observable metrics with sort by callback *** FAILED 
> *** (169 milliseconds)
> [info]   [0,99,9900,100] did not equal [0,99,4950,50] 
> (DataFrameCallbackSuite.scala:342)
> [info]   org.scalatest.exceptions.TestFailedException:
> [info]   at 
> org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
> [info]   at 
> org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
> [info]   at 
> org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231)
> [info]   at 
> org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295)
> [info]   at 
> org.apache.spark.sql.util.DataFrameCallbackSuite.checkMetrics$1(DataFrameCallbackSuite.scala:342)
> [info]   at 
> org.apache.spark.sql.util.DataFrameCallbackSuite.validateObservedMetrics(DataFrameCallbackSuite.scala:350)
> [info]   at 
> org.apache.spark.sql.util.DataFrameCallbackSuite.$anonfun$new$21(DataFrameCallbackSuite.scala:324)
> [info]   at 
> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
> [info]   at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
> [info]   at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
> [info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
> [info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
> [info]   at org.scalatest.Transformer.apply(Transformer.scala:20)
> {code}
> I could not figure out how this happes. Hopefully the UT can help with 
> debugging



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-37487) CollectMetrics is executed twice if it is followed by an sort

2021-11-29 Thread Tanel Kiis (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-37487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17450452#comment-17450452
 ] 

Tanel Kiis commented on SPARK-37487:


[~cloud_fan] and [~sarutak], you helped with the last CollectMetrics bug. 
Perhaps you have some idea, why this is happening.


> CollectMetrics is executed twice if it is followed by an sort
> -
>
> Key: SPARK-37487
> URL: https://issues.apache.org/jira/browse/SPARK-37487
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.3.0
>Reporter: Tanel Kiis
>Priority: Major
>
> It is best examplified by this new UT in DataFrameCallbackSuite:
> {code}
>   test("SPARK-X: get observable metrics with sort by callback") {
> val df = spark.range(100)
>   .observe(
> name = "my_event",
> min($"id").as("min_val"),
> max($"id").as("max_val"),
> // Test unresolved alias
> sum($"id"),
> count(when($"id" % 2 === 0, 1)).as("num_even"))
>   .observe(
> name = "other_event",
> avg($"id").cast("int").as("avg_val"))
>   .sort($"id".desc)
> validateObservedMetrics(df)
>   }
> {code}
> The count and sum aggregate report twice the number of rows:
> {code}
> [info] - SPARK-X: get observable metrics with sort by callback *** FAILED 
> *** (169 milliseconds)
> [info]   [0,99,9900,100] did not equal [0,99,4950,50] 
> (DataFrameCallbackSuite.scala:342)
> [info]   org.scalatest.exceptions.TestFailedException:
> [info]   at 
> org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
> [info]   at 
> org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
> [info]   at 
> org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231)
> [info]   at 
> org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295)
> [info]   at 
> org.apache.spark.sql.util.DataFrameCallbackSuite.checkMetrics$1(DataFrameCallbackSuite.scala:342)
> [info]   at 
> org.apache.spark.sql.util.DataFrameCallbackSuite.validateObservedMetrics(DataFrameCallbackSuite.scala:350)
> [info]   at 
> org.apache.spark.sql.util.DataFrameCallbackSuite.$anonfun$new$21(DataFrameCallbackSuite.scala:324)
> [info]   at 
> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
> [info]   at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
> [info]   at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
> [info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
> [info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
> [info]   at org.scalatest.Transformer.apply(Transformer.scala:20)
> {code}
> I could not figure out how this happes. Hopefully the UT can help with 
> debugging



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-37487) CollectMetrics is executed twice if it is followed by an sort

2021-11-29 Thread Tanel Kiis (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-37487?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tanel Kiis updated SPARK-37487:
---
Description: 
It is best examplified by this new UT in DataFrameCallbackSuite:
{code}
  test("SPARK-X: get observable metrics with sort by callback") {
val df = spark.range(100)
  .observe(
name = "my_event",
min($"id").as("min_val"),
max($"id").as("max_val"),
// Test unresolved alias
sum($"id"),
count(when($"id" % 2 === 0, 1)).as("num_even"))
  .observe(
name = "other_event",
avg($"id").cast("int").as("avg_val"))
  .sort($"id".desc)

validateObservedMetrics(df)
  }
{code}

The count and sum aggregate reports twice the number of rows:
{code}
[info] - SPARK-X: get observable metrics with sort by callback *** FAILED 
*** (169 milliseconds)
[info]   [0,99,9900,100] did not equal [0,99,4950,50] 
(DataFrameCallbackSuite.scala:342)
[info]   org.scalatest.exceptions.TestFailedException:
[info]   at 
org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
[info]   at 
org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
[info]   at 
org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231)
[info]   at 
org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295)
[info]   at 
org.apache.spark.sql.util.DataFrameCallbackSuite.checkMetrics$1(DataFrameCallbackSuite.scala:342)
[info]   at 
org.apache.spark.sql.util.DataFrameCallbackSuite.validateObservedMetrics(DataFrameCallbackSuite.scala:350)
[info]   at 
org.apache.spark.sql.util.DataFrameCallbackSuite.$anonfun$new$21(DataFrameCallbackSuite.scala:324)
[info]   at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
[info]   at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
[info]   at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
[info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:20)
{code}

I could not figure out how this happes. Hopefully the UT can help with debugging

  was:
It is best examplified by this new UT in DataFrameCallbackSuite:
{code}
  test("SPARK-X: get observable metrics with sort by callback") {
val df = spark.range(100)
  .observe(
name = "my_event",
min($"id").as("min_val"),
max($"id").as("max_val"),
// Test unresolved alias
sum($"id"),
count(when($"id" % 2 === 0, 1)).as("num_even"))
  .observe(
name = "other_event",
avg($"id").cast("int").as("avg_val"))
  .sort($"id".desc)

validateObservedMetrics(df)
  }
{code}

The count aggregate reports twice the number of rows:
{code}
[info] - SPARK-X: get observable metrics with sort by callback *** FAILED 
*** (169 milliseconds)
[info]   [0,99,9900,100] did not equal [0,99,4950,50] 
(DataFrameCallbackSuite.scala:342)
[info]   org.scalatest.exceptions.TestFailedException:
[info]   at 
org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
[info]   at 
org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
[info]   at 
org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231)
[info]   at 
org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295)
[info]   at 
org.apache.spark.sql.util.DataFrameCallbackSuite.checkMetrics$1(DataFrameCallbackSuite.scala:342)
[info]   at 
org.apache.spark.sql.util.DataFrameCallbackSuite.validateObservedMetrics(DataFrameCallbackSuite.scala:350)
[info]   at 
org.apache.spark.sql.util.DataFrameCallbackSuite.$anonfun$new$21(DataFrameCallbackSuite.scala:324)
[info]   at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
[info]   at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
[info]   at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
[info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:20)
{code}

I could not figure out how this happes. Hopefully the UT can help with debugging


> CollectMetrics is executed twice if it is followed by an sort
> -
>
> Key: SPARK-37487
> URL: https://issues.apache.org/jira/browse/SPARK-37487
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.3.0
>Reporter: Tanel Kiis
>Priority: Major
>
> It is best examplified by this new UT in DataFrameCallbackSuite:
> {code}
>   test("SPARK-X: get observable metrics with sort by callback") {
> val df = spark.range(100)
>   .observe(
> name = 

[jira] [Updated] (SPARK-37487) CollectMetrics is executed twice if it is followed by an sort

2021-11-29 Thread Tanel Kiis (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-37487?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tanel Kiis updated SPARK-37487:
---
Description: 
It is best examplified by this new UT in DataFrameCallbackSuite:
{code}
  test("SPARK-X: get observable metrics with sort by callback") {
val df = spark.range(100)
  .observe(
name = "my_event",
min($"id").as("min_val"),
max($"id").as("max_val"),
// Test unresolved alias
sum($"id"),
count(when($"id" % 2 === 0, 1)).as("num_even"))
  .observe(
name = "other_event",
avg($"id").cast("int").as("avg_val"))
  .sort($"id".desc)

validateObservedMetrics(df)
  }
{code}

The count aggregate reports twice the number of rows:
{code}
[info] - SPARK-X: get observable metrics with sort by callback *** FAILED 
*** (169 milliseconds)
[info]   [0,99,9900,100] did not equal [0,99,4950,50] 
(DataFrameCallbackSuite.scala:342)
[info]   org.scalatest.exceptions.TestFailedException:
[info]   at 
org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
[info]   at 
org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
[info]   at 
org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231)
[info]   at 
org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295)
[info]   at 
org.apache.spark.sql.util.DataFrameCallbackSuite.checkMetrics$1(DataFrameCallbackSuite.scala:342)
[info]   at 
org.apache.spark.sql.util.DataFrameCallbackSuite.validateObservedMetrics(DataFrameCallbackSuite.scala:350)
[info]   at 
org.apache.spark.sql.util.DataFrameCallbackSuite.$anonfun$new$21(DataFrameCallbackSuite.scala:324)
[info]   at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
[info]   at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
[info]   at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
[info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:20)
{code}

I could not figure out how this happes. Hopefully the UT can help with debugging

  was:
It is bets examplified by this new UT in DataFrameCallbackSuite:
{code}
  test("SPARK-X: get observable metrics with sort by callback") {
val df = spark.range(100)
  .observe(
name = "my_event",
min($"id").as("min_val"),
max($"id").as("max_val"),
// Test unresolved alias
sum($"id"),
count(when($"id" % 2 === 0, 1)).as("num_even"))
  .observe(
name = "other_event",
avg($"id").cast("int").as("avg_val"))
  .sort($"id".desc)

validateObservedMetrics(df)
  }
{code}

The count aggregate reports twice the number of rows:
{code}
[info] - SPARK-X: get observable metrics with sort by callback *** FAILED 
*** (169 milliseconds)
[info]   [0,99,9900,100] did not equal [0,99,4950,50] 
(DataFrameCallbackSuite.scala:342)
[info]   org.scalatest.exceptions.TestFailedException:
[info]   at 
org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
[info]   at 
org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
[info]   at 
org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231)
[info]   at 
org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295)
[info]   at 
org.apache.spark.sql.util.DataFrameCallbackSuite.checkMetrics$1(DataFrameCallbackSuite.scala:342)
[info]   at 
org.apache.spark.sql.util.DataFrameCallbackSuite.validateObservedMetrics(DataFrameCallbackSuite.scala:350)
[info]   at 
org.apache.spark.sql.util.DataFrameCallbackSuite.$anonfun$new$21(DataFrameCallbackSuite.scala:324)
[info]   at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
[info]   at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
[info]   at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
[info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:20)
{code}

I could not figure out how this happes. Hopefully the UT can help with debugging


> CollectMetrics is executed twice if it is followed by an sort
> -
>
> Key: SPARK-37487
> URL: https://issues.apache.org/jira/browse/SPARK-37487
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.3.0
>Reporter: Tanel Kiis
>Priority: Major
>
> It is best examplified by this new UT in DataFrameCallbackSuite:
> {code}
>   test("SPARK-X: get observable metrics with sort by callback") {
> val df = spark.range(100)
>   .observe(
> name = "my_event",

[jira] [Updated] (SPARK-37487) CollectMetrics is executed twice if it is followed by an sort

2021-11-29 Thread Tanel Kiis (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-37487?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tanel Kiis updated SPARK-37487:
---
Description: 
It is best examplified by this new UT in DataFrameCallbackSuite:
{code}
  test("SPARK-X: get observable metrics with sort by callback") {
val df = spark.range(100)
  .observe(
name = "my_event",
min($"id").as("min_val"),
max($"id").as("max_val"),
// Test unresolved alias
sum($"id"),
count(when($"id" % 2 === 0, 1)).as("num_even"))
  .observe(
name = "other_event",
avg($"id").cast("int").as("avg_val"))
  .sort($"id".desc)

validateObservedMetrics(df)
  }
{code}

The count and sum aggregate report twice the number of rows:
{code}
[info] - SPARK-X: get observable metrics with sort by callback *** FAILED 
*** (169 milliseconds)
[info]   [0,99,9900,100] did not equal [0,99,4950,50] 
(DataFrameCallbackSuite.scala:342)
[info]   org.scalatest.exceptions.TestFailedException:
[info]   at 
org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
[info]   at 
org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
[info]   at 
org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231)
[info]   at 
org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295)
[info]   at 
org.apache.spark.sql.util.DataFrameCallbackSuite.checkMetrics$1(DataFrameCallbackSuite.scala:342)
[info]   at 
org.apache.spark.sql.util.DataFrameCallbackSuite.validateObservedMetrics(DataFrameCallbackSuite.scala:350)
[info]   at 
org.apache.spark.sql.util.DataFrameCallbackSuite.$anonfun$new$21(DataFrameCallbackSuite.scala:324)
[info]   at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
[info]   at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
[info]   at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
[info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:20)
{code}

I could not figure out how this happes. Hopefully the UT can help with debugging

  was:
It is best examplified by this new UT in DataFrameCallbackSuite:
{code}
  test("SPARK-X: get observable metrics with sort by callback") {
val df = spark.range(100)
  .observe(
name = "my_event",
min($"id").as("min_val"),
max($"id").as("max_val"),
// Test unresolved alias
sum($"id"),
count(when($"id" % 2 === 0, 1)).as("num_even"))
  .observe(
name = "other_event",
avg($"id").cast("int").as("avg_val"))
  .sort($"id".desc)

validateObservedMetrics(df)
  }
{code}

The count and sum aggregate reports twice the number of rows:
{code}
[info] - SPARK-X: get observable metrics with sort by callback *** FAILED 
*** (169 milliseconds)
[info]   [0,99,9900,100] did not equal [0,99,4950,50] 
(DataFrameCallbackSuite.scala:342)
[info]   org.scalatest.exceptions.TestFailedException:
[info]   at 
org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
[info]   at 
org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
[info]   at 
org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231)
[info]   at 
org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295)
[info]   at 
org.apache.spark.sql.util.DataFrameCallbackSuite.checkMetrics$1(DataFrameCallbackSuite.scala:342)
[info]   at 
org.apache.spark.sql.util.DataFrameCallbackSuite.validateObservedMetrics(DataFrameCallbackSuite.scala:350)
[info]   at 
org.apache.spark.sql.util.DataFrameCallbackSuite.$anonfun$new$21(DataFrameCallbackSuite.scala:324)
[info]   at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
[info]   at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
[info]   at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
[info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:20)
{code}

I could not figure out how this happes. Hopefully the UT can help with debugging


> CollectMetrics is executed twice if it is followed by an sort
> -
>
> Key: SPARK-37487
> URL: https://issues.apache.org/jira/browse/SPARK-37487
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.3.0
>Reporter: Tanel Kiis
>Priority: Major
>
> It is best examplified by this new UT in DataFrameCallbackSuite:
> {code}
>   test("SPARK-X: get observable metrics with sort by callback") {
> val df = spark.range(100)
>   .observe(
> name 

[jira] [Created] (SPARK-37487) CollectMetrics is executed twice if it is followed by an sort

2021-11-29 Thread Tanel Kiis (Jira)
Tanel Kiis created SPARK-37487:
--

 Summary: CollectMetrics is executed twice if it is followed by an 
sort
 Key: SPARK-37487
 URL: https://issues.apache.org/jira/browse/SPARK-37487
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.3.0
Reporter: Tanel Kiis


It is bets examplified by this new UT in DataFrameCallbackSuite:
{code}
  test("SPARK-X: get observable metrics with sort by callback") {
val df = spark.range(100)
  .observe(
name = "my_event",
min($"id").as("min_val"),
max($"id").as("max_val"),
// Test unresolved alias
sum($"id"),
count(when($"id" % 2 === 0, 1)).as("num_even"))
  .observe(
name = "other_event",
avg($"id").cast("int").as("avg_val"))
  .sort($"id".desc)

validateObservedMetrics(df)
  }
{code}

The count aggregate reports twice the number of rows:
{code}
[info] - SPARK-X: get observable metrics with sort by callback *** FAILED 
*** (169 milliseconds)
[info]   [0,99,9900,100] did not equal [0,99,4950,50] 
(DataFrameCallbackSuite.scala:342)
[info]   org.scalatest.exceptions.TestFailedException:
[info]   at 
org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
[info]   at 
org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
[info]   at 
org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231)
[info]   at 
org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295)
[info]   at 
org.apache.spark.sql.util.DataFrameCallbackSuite.checkMetrics$1(DataFrameCallbackSuite.scala:342)
[info]   at 
org.apache.spark.sql.util.DataFrameCallbackSuite.validateObservedMetrics(DataFrameCallbackSuite.scala:350)
[info]   at 
org.apache.spark.sql.util.DataFrameCallbackSuite.$anonfun$new$21(DataFrameCallbackSuite.scala:324)
[info]   at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
[info]   at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
[info]   at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
[info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:20)
{code}

I could not figure out how this happes. Hopefully the UT can help with debugging



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-36844) Window function "first" (unboundedFollowing) appears significantly slower than "last" (unboundedPreceding) in identical circumstances

2021-10-29 Thread Tanel Kiis (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-36844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17436116#comment-17436116
 ] 

Tanel Kiis commented on SPARK-36844:


Hello,

I also hit this issue a while back and found that, It is a bit explained in 
this code comment:
https://github.com/apache/spark/blob/abf9675a7559d5666e40f25098334b5edbf8c0c3/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala#L609-L611

So it is not the fault of the first aggregator, but it is the 
UnboundedFollowing window frame. There are definetly some optimizations, that 
could be done.

If I followed your code correctly, then I think you would be better of using 
the [lead|https://spark.apache.org/docs/latest/api/sql/index.html#lead] and 
[lag|https://spark.apache.org/docs/latest/api/sql/index.html#lag] window 
functions. With those you can drop the .rowsBetween(...) part from your window 
specs.

> Window function "first" (unboundedFollowing) appears significantly slower 
> than "last" (unboundedPreceding) in identical circumstances
> -
>
> Key: SPARK-36844
> URL: https://issues.apache.org/jira/browse/SPARK-36844
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark, Windows
>Affects Versions: 3.1.1
>Reporter: Alain Bryden
>Priority: Minor
> Attachments: Physical Plan 2 - workaround.png, Pysical Plan.png
>
>
> I originally posted a question on SO because I thought perhaps I was doing 
> something wrong:
> [https://stackoverflow.com/questions/69308560|https://stackoverflow.com/questions/69308560/spark-first-window-function-is-taking-much-longer-than-last?noredirect=1#comment122505685_69308560]
> Perhaps I am, but I'm now fairly convinced that there's something wonky with 
> the implementation of `first` that's causing it to unnecessarily have a much 
> worse complexity than `last`.
>  
> More or less copy-pasted from the above post:
> I was working on a pyspark routine to interpolate the missing values in a 
> configuration table.
> Imagine a table of configuration values that go from 0 to 50,000. The user 
> specifies a few data points in between (say at 0, 50, 100, 500, 2000, 50) 
> and we interpolate the remainder. My solution mostly follows [this blog 
> post|https://walkenho.github.io/interpolating-time-series-p2-spark/] quite 
> closely, except I'm not using any UDFs.
> In troubleshooting the performance of this (takes ~3 minutes) I found that 
> one particular window function is taking all of the time, and everything else 
> I'm doing takes mere seconds.
> Here is the main area of interest - where I use window functions to fill in 
> the previous and next user-supplied configuration values:
> {code:python}
> from pyspark.sql import Window, functions as F
> # Create partition windows that are required to generate new rows from the 
> ones provided
> win_last = Window.partitionBy('PORT_TYPE', 
> 'loss_process').orderBy('rank').rowsBetween(Window.unboundedPreceding, 0)
> win_next = Window.partitionBy('PORT_TYPE', 
> 'loss_process').orderBy('rank').rowsBetween(0, Window.unboundedFollowing)
> # Join back in the provided config table to populate the "known" scale factors
> df_part1 = (df_scale_factors_template
>   .join(df_users_config, ['PORT_TYPE', 'loss_process', 'rank'], 'leftouter')
>   # Add computed columns that can lookup the prior config and next config for 
> each missing value
>   .withColumn('last_rank', F.last( F.col('rank'), 
> ignorenulls=True).over(win_last))
>   .withColumn('last_sf',   F.last( F.col('scale_factor'), 
> ignorenulls=True).over(win_last))
> ).cache()
> debug_log_dataframe(df_part1 , 'df_part1') # Force a .count() and time Part1
> df_part2 = (df_part1
>   .withColumn('next_rank', F.first(F.col('rank'), 
> ignorenulls=True).over(win_next))
>   .withColumn('next_sf',   F.first(F.col('scale_factor'), 
> ignorenulls=True).over(win_next))
> ).cache()
> debug_log_dataframe(df_part2 , 'df_part2') # Force a .count() and time Part2
> df_part3 = (df_part2
>   # Implements standard linear interpolation: y = y1 + ((y2-y1)/(x2-x1)) * 
> (x-x1)
>   .withColumn('scale_factor', 
>   F.when(F.col('last_rank')==F.col('next_rank'), 
> F.col('last_sf')) # Handle div/0 case
>   .otherwise(F.col('last_sf') + 
> ((F.col('next_sf')-F.col('last_sf'))/(F.col('next_rank')-F.col('last_rank'))) 
> * (F.col('rank')-F.col('last_rank'
>   .select('PORT_TYPE', 'loss_process', 'rank', 'scale_factor')
> ).cache()
> debug_log_dataframe(df_part3, 'df_part3', explain: True)
> {code}
>  
> The above used to be a single chained dataframe statement, but I've since 
> split it into 3 parts so that I could isolate the part that's taking so long. 
> The results 

[jira] [Created] (SPARK-37074) Push extra predicates through non-join

2021-10-20 Thread Tanel Kiis (Jira)
Tanel Kiis created SPARK-37074:
--

 Summary: Push extra predicates through non-join
 Key: SPARK-37074
 URL: https://issues.apache.org/jira/browse/SPARK-37074
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.3.0
Reporter: Tanel Kiis


In the Optimizer we could partially push some predicates through a non-join 
nodes, that produce new columns: Aggregate, Generate, Window.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-36861) Partition columns are overly eagerly parsed as dates

2021-09-28 Thread Tanel Kiis (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-36861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17421334#comment-17421334
 ] 

Tanel Kiis commented on SPARK-36861:


Yes, in 3.1 it is parsed as string. In 3.3 (master) it is parsed as date.

> Partition columns are overly eagerly parsed as dates
> 
>
> Key: SPARK-36861
> URL: https://issues.apache.org/jira/browse/SPARK-36861
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.3.0
>Reporter: Tanel Kiis
>Priority: Blocker
>
> I have an input directory with subdirs:
> * hour=2021-01-01T00
> * hour=2021-01-01T01
> * hour=2021-01-01T02
> * ...
> in spark 3.1 the 'hour' column is parsed as a string type, but in 3.2 RC it 
> is parsed as date type and the hour part is lost.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-36861) Partition columns are overly eagerly parsed as dates

2021-09-27 Thread Tanel Kiis (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-36861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17420553#comment-17420553
 ] 

Tanel Kiis commented on SPARK-36861:


Sorry, indeed I ran the test on master. Nevermind it then, does not impact the 
3.2 release.

> Partition columns are overly eagerly parsed as dates
> 
>
> Key: SPARK-36861
> URL: https://issues.apache.org/jira/browse/SPARK-36861
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.3.0
>Reporter: Tanel Kiis
>Priority: Major
>
> I have an input directory with subdirs:
> * hour=2021-01-01T00
> * hour=2021-01-01T01
> * hour=2021-01-01T02
> * ...
> in spark 3.1 the 'hour' column is parsed as a string type, but in 3.2 RC it 
> is parsed as date type and the hour part is lost.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-36861) Partition columns are overly eagerly parsed as dates

2021-09-27 Thread Tanel Kiis (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-36861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17420533#comment-17420533
 ] 

Tanel Kiis commented on SPARK-36861:


If this is expected behaviour, then I would expect there to be a simple way to 
turn this off. Currently only one I can think of is manually specifing the 
schema.

> Partition columns are overly eagerly parsed as dates
> 
>
> Key: SPARK-36861
> URL: https://issues.apache.org/jira/browse/SPARK-36861
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.2.0
>Reporter: Tanel Kiis
>Priority: Major
>
> I have an input directory with subdirs:
> * hour=2021-01-01T00
> * hour=2021-01-01T01
> * hour=2021-01-01T02
> * ...
> in spark 3.1 the 'hour' column is parsed as a string type, but in 3.2 RC it 
> is parsed as date type and the hour part is lost.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-36861) Partition columns are overly eagerly parsed as dates

2021-09-27 Thread Tanel Kiis (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-36861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17420532#comment-17420532
 ] 

Tanel Kiis edited comment on SPARK-36861 at 9/27/21, 7:45 AM:
--

[~Gengliang.Wang] I think, that this should be considered as a blocker for the 
3.2 release
 


was (Author: tanelk):
[~Gengliang.Wang] I think, that this should be considered as a blocker.
 

> Partition columns are overly eagerly parsed as dates
> 
>
> Key: SPARK-36861
> URL: https://issues.apache.org/jira/browse/SPARK-36861
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.2.0
>Reporter: Tanel Kiis
>Priority: Major
>
> I have an input directory with subdirs:
> * hour=2021-01-01T00
> * hour=2021-01-01T01
> * hour=2021-01-01T02
> * ...
> in spark 3.1 the 'hour' column is parsed as a string type, but in 3.2 RC it 
> is parsed as date type and the hour part is lost.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-36861) Partition columns are overly eagerly parsed as dates

2021-09-27 Thread Tanel Kiis (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-36861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17420532#comment-17420532
 ] 

Tanel Kiis commented on SPARK-36861:


[~Gengliang.Wang] I think, that this should be considered as a blocker.
 

> Partition columns are overly eagerly parsed as dates
> 
>
> Key: SPARK-36861
> URL: https://issues.apache.org/jira/browse/SPARK-36861
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.2.0
>Reporter: Tanel Kiis
>Priority: Major
>
> I have an input directory with subdirs:
> * hour=2021-01-01T00
> * hour=2021-01-01T01
> * hour=2021-01-01T02
> * ...
> in spark 3.1 the 'hour' column is parsed as a string type, but in 3.2 RC it 
> is parsed as date type and the hour part is lost.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-36861) Partition columns are overly eagerly parsed as dates

2021-09-27 Thread Tanel Kiis (Jira)
Tanel Kiis created SPARK-36861:
--

 Summary: Partition columns are overly eagerly parsed as dates
 Key: SPARK-36861
 URL: https://issues.apache.org/jira/browse/SPARK-36861
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.2.0
Reporter: Tanel Kiis


I have an input directory with subdirs:
* hour=2021-01-01T00
* hour=2021-01-01T01
* hour=2021-01-01T02
* ...

in spark 3.1 the 'hour' column is parsed as a string type, but in 3.2 RC it is 
parsed as date type and the hour part is lost.




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-36496) Remove literals from grouping expressions when using the DataFrame API

2021-08-12 Thread Tanel Kiis (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36496?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tanel Kiis updated SPARK-36496:
---
Description: The RemoveLiteralFromGroupExpressions rule might not work, 
when using the DataFrame API

> Remove literals from grouping expressions when using the DataFrame API
> --
>
> Key: SPARK-36496
> URL: https://issues.apache.org/jira/browse/SPARK-36496
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.2.0
>Reporter: Tanel Kiis
>Priority: Major
>
> The RemoveLiteralFromGroupExpressions rule might not work, when using the 
> DataFrame API



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-36496) Remove literals from grouping expressions when using the DataFrame API

2021-08-12 Thread Tanel Kiis (Jira)
Tanel Kiis created SPARK-36496:
--

 Summary: Remove literals from grouping expressions when using the 
DataFrame API
 Key: SPARK-36496
 URL: https://issues.apache.org/jira/browse/SPARK-36496
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.2.0
Reporter: Tanel Kiis






--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-35765) Distinct aggs are not duplicate sensitive

2021-06-15 Thread Tanel Kiis (Jira)
Tanel Kiis created SPARK-35765:
--

 Summary: Distinct aggs are not duplicate sensitive
 Key: SPARK-35765
 URL: https://issues.apache.org/jira/browse/SPARK-35765
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.2.0
Reporter: Tanel Kiis


Extended RemoveRedundantAggregates to remove deduplicating aggregations before 
aggregations that ignore duplicates.





--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-35695) QueryExecutionListener does not see any observed metrics fired before persist/cache

2021-06-09 Thread Tanel Kiis (Jira)
Tanel Kiis created SPARK-35695:
--

 Summary: QueryExecutionListener does not see any observed metrics 
fired before persist/cache
 Key: SPARK-35695
 URL: https://issues.apache.org/jira/browse/SPARK-35695
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.2.0
Reporter: Tanel Kiis


This example properly fires the event
{code}
spark.range(100)
  .observe(
name = "other_event",
avg($"id").cast("int").as("avg_val"))
  .collect()
{code}

But when I add persist, then no event is fired:
{code}
spark.range(100)
  .observe(
name = "other_event",
avg($"id").cast("int").as("avg_val"))
  .persist()
  .collect()
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-35695) QueryExecutionListener does not see any observed metrics fired before persist/cache

2021-06-09 Thread Tanel Kiis (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-35695?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tanel Kiis updated SPARK-35695:
---
Description: 
This example properly fires the event
{code}
spark.range(100)
  .observe(
name = "other_event",
avg($"id").cast("int").as("avg_val"))
  .collect()
{code}

But when I add persist, then no event is fired or seen (not sure which):
{code}
spark.range(100)
  .observe(
name = "my_event",
avg($"id").cast("int").as("avg_val"))
  .persist()
  .collect()
{code}

I assume, that on the first run, the event is fired.

The listener:
{code}
val metricMaps = ArrayBuffer.empty[Map[String, Row]]
val listener = new QueryExecutionListener {
  override def onSuccess(funcName: String, qe: QueryExecution, duration: 
Long): Unit = {
metricMaps += qe.observedMetrics
  }

  override def onFailure(funcName: String, qe: QueryExecution, exception: 
Exception): Unit = {
// No-op
  }
}
spark.listenerManager.register(listener)
{code}

  was:
This example properly fires the event
{code}
spark.range(100)
  .observe(
name = "other_event",
avg($"id").cast("int").as("avg_val"))
  .collect()
{code}

But when I add persist, then no event is fired or seen (not sure which):
{code}
spark.range(100)
  .observe(
name = "my_event",
avg($"id").cast("int").as("avg_val"))
  .persist()
  .collect()
{code}

The listener:
{code}
val metricMaps = ArrayBuffer.empty[Map[String, Row]]
val listener = new QueryExecutionListener {
  override def onSuccess(funcName: String, qe: QueryExecution, duration: 
Long): Unit = {
metricMaps += qe.observedMetrics
  }

  override def onFailure(funcName: String, qe: QueryExecution, exception: 
Exception): Unit = {
// No-op
  }
}
spark.listenerManager.register(listener)
{code}


> QueryExecutionListener does not see any observed metrics fired before 
> persist/cache
> ---
>
> Key: SPARK-35695
> URL: https://issues.apache.org/jira/browse/SPARK-35695
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.2.0
>Reporter: Tanel Kiis
>Priority: Major
>
> This example properly fires the event
> {code}
> spark.range(100)
>   .observe(
> name = "other_event",
> avg($"id").cast("int").as("avg_val"))
>   .collect()
> {code}
> But when I add persist, then no event is fired or seen (not sure which):
> {code}
> spark.range(100)
>   .observe(
> name = "my_event",
> avg($"id").cast("int").as("avg_val"))
>   .persist()
>   .collect()
> {code}
> I assume, that on the first run, the event is fired.
> The listener:
> {code}
> val metricMaps = ArrayBuffer.empty[Map[String, Row]]
> val listener = new QueryExecutionListener {
>   override def onSuccess(funcName: String, qe: QueryExecution, duration: 
> Long): Unit = {
> metricMaps += qe.observedMetrics
>   }
>   override def onFailure(funcName: String, qe: QueryExecution, exception: 
> Exception): Unit = {
> // No-op
>   }
> }
> spark.listenerManager.register(listener)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-35695) QueryExecutionListener does not see any observed metrics fired before persist/cache

2021-06-09 Thread Tanel Kiis (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-35695?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tanel Kiis updated SPARK-35695:
---
Description: 
This example properly fires the event
{code}
spark.range(100)
  .observe(
name = "other_event",
avg($"id").cast("int").as("avg_val"))
  .collect()
{code}

But when I add persist, then no event is fired or seen (not sure which):
{code}
spark.range(100)
  .observe(
name = "my_event",
avg($"id").cast("int").as("avg_val"))
  .persist()
  .collect()
{code}

The listener:
{code}
val metricMaps = ArrayBuffer.empty[Map[String, Row]]
val listener = new QueryExecutionListener {
  override def onSuccess(funcName: String, qe: QueryExecution, duration: 
Long): Unit = {
metricMaps += qe.observedMetrics
  }

  override def onFailure(funcName: String, qe: QueryExecution, exception: 
Exception): Unit = {
// No-op
  }
}
spark.listenerManager.register(listener)
{code}

  was:
This example properly fires the event
{code}
spark.range(100)
  .observe(
name = "other_event",
avg($"id").cast("int").as("avg_val"))
  .collect()
{code}

But when I add persist, then no event is fired or seen (not sure which):
{code}
spark.range(100)
  .observe(
name = "my_event",
avg($"id").cast("int").as("avg_val"))
  .persist()
  .collect()
{code}

I assume, that on the first run, the event is fired.

The listener:
{code}
val metricMaps = ArrayBuffer.empty[Map[String, Row]]
val listener = new QueryExecutionListener {
  override def onSuccess(funcName: String, qe: QueryExecution, duration: 
Long): Unit = {
metricMaps += qe.observedMetrics
  }

  override def onFailure(funcName: String, qe: QueryExecution, exception: 
Exception): Unit = {
// No-op
  }
}
spark.listenerManager.register(listener)
{code}


> QueryExecutionListener does not see any observed metrics fired before 
> persist/cache
> ---
>
> Key: SPARK-35695
> URL: https://issues.apache.org/jira/browse/SPARK-35695
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.2.0
>Reporter: Tanel Kiis
>Priority: Major
>
> This example properly fires the event
> {code}
> spark.range(100)
>   .observe(
> name = "other_event",
> avg($"id").cast("int").as("avg_val"))
>   .collect()
> {code}
> But when I add persist, then no event is fired or seen (not sure which):
> {code}
> spark.range(100)
>   .observe(
> name = "my_event",
> avg($"id").cast("int").as("avg_val"))
>   .persist()
>   .collect()
> {code}
> The listener:
> {code}
> val metricMaps = ArrayBuffer.empty[Map[String, Row]]
> val listener = new QueryExecutionListener {
>   override def onSuccess(funcName: String, qe: QueryExecution, duration: 
> Long): Unit = {
> metricMaps += qe.observedMetrics
>   }
>   override def onFailure(funcName: String, qe: QueryExecution, exception: 
> Exception): Unit = {
> // No-op
>   }
> }
> spark.listenerManager.register(listener)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-35695) QueryExecutionListener does not see any observed metrics fired before persist/cache

2021-06-09 Thread Tanel Kiis (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-35695?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17359849#comment-17359849
 ] 

Tanel Kiis commented on SPARK-35695:


[~cloud_fan] and [~sarutak], you are currently working on a PR related to the 
observed metrics. Perhaps you could take a look at this one too.

> QueryExecutionListener does not see any observed metrics fired before 
> persist/cache
> ---
>
> Key: SPARK-35695
> URL: https://issues.apache.org/jira/browse/SPARK-35695
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.2.0
>Reporter: Tanel Kiis
>Priority: Major
>
> This example properly fires the event
> {code}
> spark.range(100)
>   .observe(
> name = "other_event",
> avg($"id").cast("int").as("avg_val"))
>   .collect()
> {code}
> But when I add persist, then no event is fired or seen (not sure which):
> {code}
> spark.range(100)
>   .observe(
> name = "my_event",
> avg($"id").cast("int").as("avg_val"))
>   .persist()
>   .collect()
> {code}
> The listener:
> {code}
> val metricMaps = ArrayBuffer.empty[Map[String, Row]]
> val listener = new QueryExecutionListener {
>   override def onSuccess(funcName: String, qe: QueryExecution, duration: 
> Long): Unit = {
> metricMaps += qe.observedMetrics
>   }
>   override def onFailure(funcName: String, qe: QueryExecution, exception: 
> Exception): Unit = {
> // No-op
>   }
> }
> spark.listenerManager.register(listener)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-35695) QueryExecutionListener does not see any observed metrics fired before persist/cache

2021-06-09 Thread Tanel Kiis (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-35695?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tanel Kiis updated SPARK-35695:
---
Description: 
This example properly fires the event
{code}
spark.range(100)
  .observe(
name = "other_event",
avg($"id").cast("int").as("avg_val"))
  .collect()
{code}

But when I add persist, then no event is fired or seen (not sure which):
{code}
spark.range(100)
  .observe(
name = "my_event",
avg($"id").cast("int").as("avg_val"))
  .persist()
  .collect()
{code}

The listener:
{code}
val metricMaps = ArrayBuffer.empty[Map[String, Row]]
val listener = new QueryExecutionListener {
  override def onSuccess(funcName: String, qe: QueryExecution, duration: 
Long): Unit = {
metricMaps += qe.observedMetrics
  }

  override def onFailure(funcName: String, qe: QueryExecution, exception: 
Exception): Unit = {
// No-op
  }
}
spark.listenerManager.register(listener)
{code}

  was:
This example properly fires the event
{code}
spark.range(100)
  .observe(
name = "other_event",
avg($"id").cast("int").as("avg_val"))
  .collect()
{code}

But when I add persist, then no event is fired or seen (not sure which):
{code}
spark.range(100)
  .observe(
name = "my_event",
avg($"id").cast("int").as("avg_val"))
  .persist()
  .collect()
{code}


> QueryExecutionListener does not see any observed metrics fired before 
> persist/cache
> ---
>
> Key: SPARK-35695
> URL: https://issues.apache.org/jira/browse/SPARK-35695
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.2.0
>Reporter: Tanel Kiis
>Priority: Major
>
> This example properly fires the event
> {code}
> spark.range(100)
>   .observe(
> name = "other_event",
> avg($"id").cast("int").as("avg_val"))
>   .collect()
> {code}
> But when I add persist, then no event is fired or seen (not sure which):
> {code}
> spark.range(100)
>   .observe(
> name = "my_event",
> avg($"id").cast("int").as("avg_val"))
>   .persist()
>   .collect()
> {code}
> The listener:
> {code}
> val metricMaps = ArrayBuffer.empty[Map[String, Row]]
> val listener = new QueryExecutionListener {
>   override def onSuccess(funcName: String, qe: QueryExecution, duration: 
> Long): Unit = {
> metricMaps += qe.observedMetrics
>   }
>   override def onFailure(funcName: String, qe: QueryExecution, exception: 
> Exception): Unit = {
> // No-op
>   }
> }
> spark.listenerManager.register(listener)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-35695) QueryExecutionListener does not see any observed metrics fired before persist/cache

2021-06-09 Thread Tanel Kiis (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-35695?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tanel Kiis updated SPARK-35695:
---
Description: 
This example properly fires the event
{code}
spark.range(100)
  .observe(
name = "other_event",
avg($"id").cast("int").as("avg_val"))
  .collect()
{code}

But when I add persist, then no event is fired or seen (not sure which):
{code}
spark.range(100)
  .observe(
name = "my_event",
avg($"id").cast("int").as("avg_val"))
  .persist()
  .collect()
{code}

  was:
This example properly fires the event
{code}
spark.range(100)
  .observe(
name = "other_event",
avg($"id").cast("int").as("avg_val"))
  .collect()
{code}

But when I add persist, then no event is fired or seen (not sure which):
{code}
spark.range(100)
  .observe(
name = "other_event",
avg($"id").cast("int").as("avg_val"))
  .persist()
  .collect()
{code}


> QueryExecutionListener does not see any observed metrics fired before 
> persist/cache
> ---
>
> Key: SPARK-35695
> URL: https://issues.apache.org/jira/browse/SPARK-35695
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.2.0
>Reporter: Tanel Kiis
>Priority: Major
>
> This example properly fires the event
> {code}
> spark.range(100)
>   .observe(
> name = "other_event",
> avg($"id").cast("int").as("avg_val"))
>   .collect()
> {code}
> But when I add persist, then no event is fired or seen (not sure which):
> {code}
> spark.range(100)
>   .observe(
> name = "my_event",
> avg($"id").cast("int").as("avg_val"))
>   .persist()
>   .collect()
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-35695) QueryExecutionListener does not see any observed metrics fired before persist/cache

2021-06-09 Thread Tanel Kiis (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-35695?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tanel Kiis updated SPARK-35695:
---
Description: 
This example properly fires the event
{code}
spark.range(100)
  .observe(
name = "other_event",
avg($"id").cast("int").as("avg_val"))
  .collect()
{code}

But when I add persist, then no event is fired or seen (not sure which):
{code}
spark.range(100)
  .observe(
name = "other_event",
avg($"id").cast("int").as("avg_val"))
  .persist()
  .collect()
{code}

  was:
This example properly fires the event
{code}
spark.range(100)
  .observe(
name = "other_event",
avg($"id").cast("int").as("avg_val"))
  .collect()
{code}

But when I add persist, then no event is fired:
{code}
spark.range(100)
  .observe(
name = "other_event",
avg($"id").cast("int").as("avg_val"))
  .persist()
  .collect()
{code}


> QueryExecutionListener does not see any observed metrics fired before 
> persist/cache
> ---
>
> Key: SPARK-35695
> URL: https://issues.apache.org/jira/browse/SPARK-35695
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.2.0
>Reporter: Tanel Kiis
>Priority: Major
>
> This example properly fires the event
> {code}
> spark.range(100)
>   .observe(
> name = "other_event",
> avg($"id").cast("int").as("avg_val"))
>   .collect()
> {code}
> But when I add persist, then no event is fired or seen (not sure which):
> {code}
> spark.range(100)
>   .observe(
> name = "other_event",
> avg($"id").cast("int").as("avg_val"))
>   .persist()
>   .collect()
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-35630) ExpandExec should not introduce unnecessary exchanges

2021-06-03 Thread Tanel Kiis (Jira)
Tanel Kiis created SPARK-35630:
--

 Summary: ExpandExec should not introduce unnecessary exchanges
 Key: SPARK-35630
 URL: https://issues.apache.org/jira/browse/SPARK-35630
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.2.0
Reporter: Tanel Kiis


The Expand is commonly introduced by the RewriteDistinctAggregates optimizer 
rule.
In that case there can be several attributes that are kept as they are by the 
Expand.
If the child's output is partitioned by those attributes, then so will be the 
output of the Expand.
In general case the Expand can output data with arbitrary partitioning, so set 
it as UNKNOWN partitioning.




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-35296) Dataset.observe fails with an assertion

2021-06-03 Thread Tanel Kiis (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-35296?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17356253#comment-17356253
 ] 

Tanel Kiis commented on SPARK-35296:


Perhaps someone who knows the internals better, ccould help here. [~cloud_fan]? 
I tried to take another look, but I can't figure it out.

> Dataset.observe fails with an assertion
> ---
>
> Key: SPARK-35296
> URL: https://issues.apache.org/jira/browse/SPARK-35296
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.1.1, 3.2.0
>Reporter: Tanel Kiis
>Priority: Major
> Attachments: 2021-05-03_18-34.png
>
>
> I hit this assertion error when using dataset.observe:
> {code}
> java.lang.AssertionError: assertion failed
>   at scala.Predef$.assert(Predef.scala:208) ~[scala-library-2.12.10.jar:?]
>   at 
> org.apache.spark.sql.execution.AggregatingAccumulator.setState(AggregatingAccumulator.scala:204)
>  ~[spark-sql_2.12-3.1.1.jar:3.1.1]
>   at 
> org.apache.spark.sql.execution.CollectMetricsExec.$anonfun$doExecute$2(CollectMetricsExec.scala:72)
>  ~[spark-sql_2.12-3.1.1.jar:3.1.1]
>   at 
> org.apache.spark.sql.execution.CollectMetricsExec.$anonfun$doExecute$2$adapted(CollectMetricsExec.scala:71)
>  ~[spark-sql_2.12-3.1.1.jar:3.1.1]
>   at 
> org.apache.spark.TaskContext$$anon$1.onTaskCompletion(TaskContext.scala:125) 
> ~[spark-core_2.12-3.1.1.jar:3.1.1]
>   at 
> org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1(TaskContextImpl.scala:124)
>  ~[spark-core_2.12-3.1.1.jar:3.1.1]
>   at 
> org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1$adapted(TaskContextImpl.scala:124)
>  ~[spark-core_2.12-3.1.1.jar:3.1.1]
>   at 
> org.apache.spark.TaskContextImpl.$anonfun$invokeListeners$1(TaskContextImpl.scala:137)
>  ~[spark-core_2.12-3.1.1.jar:3.1.1]
>   at 
> org.apache.spark.TaskContextImpl.$anonfun$invokeListeners$1$adapted(TaskContextImpl.scala:135)
>  ~[spark-core_2.12-3.1.1.jar:3.1.1]
>   at 
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) 
> ~[scala-library-2.12.10.jar:?]
>   at 
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) 
> ~[scala-library-2.12.10.jar:?]
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) 
> ~[scala-library-2.12.10.jar:?]
>   at 
> org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:135) 
> ~[spark-core_2.12-3.1.1.jar:3.1.1]
>   at 
> org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:124) 
> ~[spark-core_2.12-3.1.1.jar:3.1.1]
>   at org.apache.spark.scheduler.Task.run(Task.scala:147) 
> ~[spark-core_2.12-3.1.1.jar:3.1.1]
>   at 
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
>  ~[spark-core_2.12-3.1.1.jar:3.1.1]
>   at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) 
> [spark-core_2.12-3.1.1.jar:3.1.1]
>   at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) 
> [spark-core_2.12-3.1.1.jar:3.1.1]
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  [?:1.8.0_282]
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  [?:1.8.0_282]
>   at java.lang.Thread.run(Thread.java:748) [?:1.8.0_282]
> {code}
> A workaround, that I used was to add .coalesce(1) before calling this method.
> It happens in a quite complex query and I have not been able to reproduce 
> this with a simpler query
> Added an screenshot of the debugger, at the moment of exception
>  !2021-05-03_18-34.png! 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-35296) Dataset.observe fails with an assertion

2021-06-03 Thread Tanel Kiis (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-35296?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tanel Kiis updated SPARK-35296:
---
Affects Version/s: 3.2.0

> Dataset.observe fails with an assertion
> ---
>
> Key: SPARK-35296
> URL: https://issues.apache.org/jira/browse/SPARK-35296
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.1.1, 3.2.0
>Reporter: Tanel Kiis
>Priority: Major
> Attachments: 2021-05-03_18-34.png
>
>
> I hit this assertion error when using dataset.observe:
> {code}
> java.lang.AssertionError: assertion failed
>   at scala.Predef$.assert(Predef.scala:208) ~[scala-library-2.12.10.jar:?]
>   at 
> org.apache.spark.sql.execution.AggregatingAccumulator.setState(AggregatingAccumulator.scala:204)
>  ~[spark-sql_2.12-3.1.1.jar:3.1.1]
>   at 
> org.apache.spark.sql.execution.CollectMetricsExec.$anonfun$doExecute$2(CollectMetricsExec.scala:72)
>  ~[spark-sql_2.12-3.1.1.jar:3.1.1]
>   at 
> org.apache.spark.sql.execution.CollectMetricsExec.$anonfun$doExecute$2$adapted(CollectMetricsExec.scala:71)
>  ~[spark-sql_2.12-3.1.1.jar:3.1.1]
>   at 
> org.apache.spark.TaskContext$$anon$1.onTaskCompletion(TaskContext.scala:125) 
> ~[spark-core_2.12-3.1.1.jar:3.1.1]
>   at 
> org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1(TaskContextImpl.scala:124)
>  ~[spark-core_2.12-3.1.1.jar:3.1.1]
>   at 
> org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1$adapted(TaskContextImpl.scala:124)
>  ~[spark-core_2.12-3.1.1.jar:3.1.1]
>   at 
> org.apache.spark.TaskContextImpl.$anonfun$invokeListeners$1(TaskContextImpl.scala:137)
>  ~[spark-core_2.12-3.1.1.jar:3.1.1]
>   at 
> org.apache.spark.TaskContextImpl.$anonfun$invokeListeners$1$adapted(TaskContextImpl.scala:135)
>  ~[spark-core_2.12-3.1.1.jar:3.1.1]
>   at 
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) 
> ~[scala-library-2.12.10.jar:?]
>   at 
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) 
> ~[scala-library-2.12.10.jar:?]
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) 
> ~[scala-library-2.12.10.jar:?]
>   at 
> org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:135) 
> ~[spark-core_2.12-3.1.1.jar:3.1.1]
>   at 
> org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:124) 
> ~[spark-core_2.12-3.1.1.jar:3.1.1]
>   at org.apache.spark.scheduler.Task.run(Task.scala:147) 
> ~[spark-core_2.12-3.1.1.jar:3.1.1]
>   at 
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
>  ~[spark-core_2.12-3.1.1.jar:3.1.1]
>   at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) 
> [spark-core_2.12-3.1.1.jar:3.1.1]
>   at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) 
> [spark-core_2.12-3.1.1.jar:3.1.1]
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  [?:1.8.0_282]
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  [?:1.8.0_282]
>   at java.lang.Thread.run(Thread.java:748) [?:1.8.0_282]
> {code}
> A workaround, that I used was to add .coalesce(1) before calling this method.
> It happens in a quite complex query and I have not been able to reproduce 
> this with a simpler query
> Added an screenshot of the debugger, at the moment of exception
>  !2021-05-03_18-34.png! 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-34623) Deduplicate window expressions

2021-06-02 Thread Tanel Kiis (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-34623?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tanel Kiis resolved SPARK-34623.

Resolution: Won't Do

> Deduplicate window expressions
> --
>
> Key: SPARK-34623
> URL: https://issues.apache.org/jira/browse/SPARK-34623
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.2.0
>Reporter: Tanel Kiis
>Priority: Major
>
> Remove duplicate window expressions from the Window node



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-32801) Make InferFiltersFromConstraints take in account EqualNullSafe

2021-06-02 Thread Tanel Kiis (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-32801?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tanel Kiis resolved SPARK-32801.

Resolution: Won't Do

> Make InferFiltersFromConstraints take in account EqualNullSafe
> --
>
> Key: SPARK-32801
> URL: https://issues.apache.org/jira/browse/SPARK-32801
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.1.0
>Reporter: Tanel Kiis
>Priority: Major
>
> InferFiltersFromConstraints only infers new filters using EqualTo, 
> generalized it to also include EqualNullSafe.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-35296) Dataset.observe fails with an assertion

2021-05-03 Thread Tanel Kiis (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-35296?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17338465#comment-17338465
 ] 

Tanel Kiis commented on SPARK-35296:


I finally managed to change the UT in such way, that the assertion error 
happens - the following coalesce was the missing link.
{code}
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala
index b3d29df1b2..16ebddd75c 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala
@@ -246,7 +246,7 @@ class DataFrameCallbackSuite extends QueryTest
 }
 spark.listenerManager.register(listener)
 try {
-  val df = spark.range(100)
+  val df = spark.range(0, 100, 1, 5)
 .observe(
   name = "my_event",
   min($"id").as("min_val"),
@@ -256,6 +256,7 @@ class DataFrameCallbackSuite extends QueryTest
 .observe(
   name = "other_event",
   avg($"id").cast("int").as("avg_val"))
+.coalesce(2)

   def checkMetrics(metrics: Map[String, Row]): Unit = {
 assert(metrics.size === 2)
{code}

> Dataset.observe fails with an assertion
> ---
>
> Key: SPARK-35296
> URL: https://issues.apache.org/jira/browse/SPARK-35296
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.1.1
>Reporter: Tanel Kiis
>Priority: Major
> Attachments: 2021-05-03_18-34.png
>
>
> I hit this assertion error when using dataset.observe:
> {code}
> java.lang.AssertionError: assertion failed
>   at scala.Predef$.assert(Predef.scala:208) ~[scala-library-2.12.10.jar:?]
>   at 
> org.apache.spark.sql.execution.AggregatingAccumulator.setState(AggregatingAccumulator.scala:204)
>  ~[spark-sql_2.12-3.1.1.jar:3.1.1]
>   at 
> org.apache.spark.sql.execution.CollectMetricsExec.$anonfun$doExecute$2(CollectMetricsExec.scala:72)
>  ~[spark-sql_2.12-3.1.1.jar:3.1.1]
>   at 
> org.apache.spark.sql.execution.CollectMetricsExec.$anonfun$doExecute$2$adapted(CollectMetricsExec.scala:71)
>  ~[spark-sql_2.12-3.1.1.jar:3.1.1]
>   at 
> org.apache.spark.TaskContext$$anon$1.onTaskCompletion(TaskContext.scala:125) 
> ~[spark-core_2.12-3.1.1.jar:3.1.1]
>   at 
> org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1(TaskContextImpl.scala:124)
>  ~[spark-core_2.12-3.1.1.jar:3.1.1]
>   at 
> org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1$adapted(TaskContextImpl.scala:124)
>  ~[spark-core_2.12-3.1.1.jar:3.1.1]
>   at 
> org.apache.spark.TaskContextImpl.$anonfun$invokeListeners$1(TaskContextImpl.scala:137)
>  ~[spark-core_2.12-3.1.1.jar:3.1.1]
>   at 
> org.apache.spark.TaskContextImpl.$anonfun$invokeListeners$1$adapted(TaskContextImpl.scala:135)
>  ~[spark-core_2.12-3.1.1.jar:3.1.1]
>   at 
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) 
> ~[scala-library-2.12.10.jar:?]
>   at 
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) 
> ~[scala-library-2.12.10.jar:?]
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) 
> ~[scala-library-2.12.10.jar:?]
>   at 
> org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:135) 
> ~[spark-core_2.12-3.1.1.jar:3.1.1]
>   at 
> org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:124) 
> ~[spark-core_2.12-3.1.1.jar:3.1.1]
>   at org.apache.spark.scheduler.Task.run(Task.scala:147) 
> ~[spark-core_2.12-3.1.1.jar:3.1.1]
>   at 
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
>  ~[spark-core_2.12-3.1.1.jar:3.1.1]
>   at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) 
> [spark-core_2.12-3.1.1.jar:3.1.1]
>   at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) 
> [spark-core_2.12-3.1.1.jar:3.1.1]
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  [?:1.8.0_282]
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  [?:1.8.0_282]
>   at java.lang.Thread.run(Thread.java:748) [?:1.8.0_282]
> {code}
> A workaround, that I used was to add .coalesce(1) before calling this method.
> It happens in a quite complex query and I have not been able to reproduce 
> this with a simpler query
> Added an screenshot of the debugger, at the moment of exception
>  !2021-05-03_18-34.png! 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: 

[jira] [Updated] (SPARK-35296) Dataset.observe fails with an assertion

2021-05-03 Thread Tanel Kiis (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-35296?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tanel Kiis updated SPARK-35296:
---
Attachment: 2021-05-03_18-34.png

> Dataset.observe fails with an assertion
> ---
>
> Key: SPARK-35296
> URL: https://issues.apache.org/jira/browse/SPARK-35296
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.1.1
>Reporter: Tanel Kiis
>Priority: Major
> Attachments: 2021-05-03_18-34.png
>
>
> I hit this assertion error when using dataset.observe:
> {code}
> java.lang.AssertionError: assertion failed
>   at scala.Predef$.assert(Predef.scala:208) ~[scala-library-2.12.10.jar:?]
>   at 
> org.apache.spark.sql.execution.AggregatingAccumulator.setState(AggregatingAccumulator.scala:204)
>  ~[spark-sql_2.12-3.1.1.jar:3.1.1]
>   at 
> org.apache.spark.sql.execution.CollectMetricsExec.$anonfun$doExecute$2(CollectMetricsExec.scala:72)
>  ~[spark-sql_2.12-3.1.1.jar:3.1.1]
>   at 
> org.apache.spark.sql.execution.CollectMetricsExec.$anonfun$doExecute$2$adapted(CollectMetricsExec.scala:71)
>  ~[spark-sql_2.12-3.1.1.jar:3.1.1]
>   at 
> org.apache.spark.TaskContext$$anon$1.onTaskCompletion(TaskContext.scala:125) 
> ~[spark-core_2.12-3.1.1.jar:3.1.1]
>   at 
> org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1(TaskContextImpl.scala:124)
>  ~[spark-core_2.12-3.1.1.jar:3.1.1]
>   at 
> org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1$adapted(TaskContextImpl.scala:124)
>  ~[spark-core_2.12-3.1.1.jar:3.1.1]
>   at 
> org.apache.spark.TaskContextImpl.$anonfun$invokeListeners$1(TaskContextImpl.scala:137)
>  ~[spark-core_2.12-3.1.1.jar:3.1.1]
>   at 
> org.apache.spark.TaskContextImpl.$anonfun$invokeListeners$1$adapted(TaskContextImpl.scala:135)
>  ~[spark-core_2.12-3.1.1.jar:3.1.1]
>   at 
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) 
> ~[scala-library-2.12.10.jar:?]
>   at 
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) 
> ~[scala-library-2.12.10.jar:?]
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) 
> ~[scala-library-2.12.10.jar:?]
>   at 
> org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:135) 
> ~[spark-core_2.12-3.1.1.jar:3.1.1]
>   at 
> org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:124) 
> ~[spark-core_2.12-3.1.1.jar:3.1.1]
>   at org.apache.spark.scheduler.Task.run(Task.scala:147) 
> ~[spark-core_2.12-3.1.1.jar:3.1.1]
>   at 
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
>  ~[spark-core_2.12-3.1.1.jar:3.1.1]
>   at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) 
> [spark-core_2.12-3.1.1.jar:3.1.1]
>   at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) 
> [spark-core_2.12-3.1.1.jar:3.1.1]
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  [?:1.8.0_282]
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  [?:1.8.0_282]
>   at java.lang.Thread.run(Thread.java:748) [?:1.8.0_282]
> {code}
> A workaround, that I used was to add .coalesce(1) before calling this method.
> It happens in a quite complex query and I have not been able to reproduce 
> this with a simpler query



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-35296) Dataset.observe fails with an assertion

2021-05-03 Thread Tanel Kiis (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-35296?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tanel Kiis updated SPARK-35296:
---
Description: 
I hit this assertion error when using dataset.observe:
{code}
java.lang.AssertionError: assertion failed
at scala.Predef$.assert(Predef.scala:208) ~[scala-library-2.12.10.jar:?]
at 
org.apache.spark.sql.execution.AggregatingAccumulator.setState(AggregatingAccumulator.scala:204)
 ~[spark-sql_2.12-3.1.1.jar:3.1.1]
at 
org.apache.spark.sql.execution.CollectMetricsExec.$anonfun$doExecute$2(CollectMetricsExec.scala:72)
 ~[spark-sql_2.12-3.1.1.jar:3.1.1]
at 
org.apache.spark.sql.execution.CollectMetricsExec.$anonfun$doExecute$2$adapted(CollectMetricsExec.scala:71)
 ~[spark-sql_2.12-3.1.1.jar:3.1.1]
at 
org.apache.spark.TaskContext$$anon$1.onTaskCompletion(TaskContext.scala:125) 
~[spark-core_2.12-3.1.1.jar:3.1.1]
at 
org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1(TaskContextImpl.scala:124)
 ~[spark-core_2.12-3.1.1.jar:3.1.1]
at 
org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1$adapted(TaskContextImpl.scala:124)
 ~[spark-core_2.12-3.1.1.jar:3.1.1]
at 
org.apache.spark.TaskContextImpl.$anonfun$invokeListeners$1(TaskContextImpl.scala:137)
 ~[spark-core_2.12-3.1.1.jar:3.1.1]
at 
org.apache.spark.TaskContextImpl.$anonfun$invokeListeners$1$adapted(TaskContextImpl.scala:135)
 ~[spark-core_2.12-3.1.1.jar:3.1.1]
at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) 
~[scala-library-2.12.10.jar:?]
at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) 
~[scala-library-2.12.10.jar:?]
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) 
~[scala-library-2.12.10.jar:?]
at 
org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:135) 
~[spark-core_2.12-3.1.1.jar:3.1.1]
at 
org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:124) 
~[spark-core_2.12-3.1.1.jar:3.1.1]
at org.apache.spark.scheduler.Task.run(Task.scala:147) 
~[spark-core_2.12-3.1.1.jar:3.1.1]
at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
 ~[spark-core_2.12-3.1.1.jar:3.1.1]
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) 
[spark-core_2.12-3.1.1.jar:3.1.1]
at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) 
[spark-core_2.12-3.1.1.jar:3.1.1]
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_282]
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_282]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_282]
{code}

A workaround, that I used was to add .coalesce(1) before calling this method.

It happens in a quite complex query and I have not been able to reproduce this 
with a simpler query

Added an screenshot of the debugger, at the moment of exception
 !2021-05-03_18-34.png! 

  was:
I hit this assertion error when using dataset.observe:
{code}
java.lang.AssertionError: assertion failed
at scala.Predef$.assert(Predef.scala:208) ~[scala-library-2.12.10.jar:?]
at 
org.apache.spark.sql.execution.AggregatingAccumulator.setState(AggregatingAccumulator.scala:204)
 ~[spark-sql_2.12-3.1.1.jar:3.1.1]
at 
org.apache.spark.sql.execution.CollectMetricsExec.$anonfun$doExecute$2(CollectMetricsExec.scala:72)
 ~[spark-sql_2.12-3.1.1.jar:3.1.1]
at 
org.apache.spark.sql.execution.CollectMetricsExec.$anonfun$doExecute$2$adapted(CollectMetricsExec.scala:71)
 ~[spark-sql_2.12-3.1.1.jar:3.1.1]
at 
org.apache.spark.TaskContext$$anon$1.onTaskCompletion(TaskContext.scala:125) 
~[spark-core_2.12-3.1.1.jar:3.1.1]
at 
org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1(TaskContextImpl.scala:124)
 ~[spark-core_2.12-3.1.1.jar:3.1.1]
at 
org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1$adapted(TaskContextImpl.scala:124)
 ~[spark-core_2.12-3.1.1.jar:3.1.1]
at 
org.apache.spark.TaskContextImpl.$anonfun$invokeListeners$1(TaskContextImpl.scala:137)
 ~[spark-core_2.12-3.1.1.jar:3.1.1]
at 
org.apache.spark.TaskContextImpl.$anonfun$invokeListeners$1$adapted(TaskContextImpl.scala:135)
 ~[spark-core_2.12-3.1.1.jar:3.1.1]
at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) 
~[scala-library-2.12.10.jar:?]
at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) 
~[scala-library-2.12.10.jar:?]
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) 
~[scala-library-2.12.10.jar:?]
at 
org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:135) 
~[spark-core_2.12-3.1.1.jar:3.1.1]
at 

[jira] [Updated] (SPARK-35296) Dataset.observe fails with an assertion

2021-05-03 Thread Tanel Kiis (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-35296?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tanel Kiis updated SPARK-35296:
---
Description: 
I hit this assertion error when using dataset.observe:
{code}
java.lang.AssertionError: assertion failed
at scala.Predef$.assert(Predef.scala:208) ~[scala-library-2.12.10.jar:?]
at 
org.apache.spark.sql.execution.AggregatingAccumulator.setState(AggregatingAccumulator.scala:204)
 ~[spark-sql_2.12-3.1.1.jar:3.1.1]
at 
org.apache.spark.sql.execution.CollectMetricsExec.$anonfun$doExecute$2(CollectMetricsExec.scala:72)
 ~[spark-sql_2.12-3.1.1.jar:3.1.1]
at 
org.apache.spark.sql.execution.CollectMetricsExec.$anonfun$doExecute$2$adapted(CollectMetricsExec.scala:71)
 ~[spark-sql_2.12-3.1.1.jar:3.1.1]
at 
org.apache.spark.TaskContext$$anon$1.onTaskCompletion(TaskContext.scala:125) 
~[spark-core_2.12-3.1.1.jar:3.1.1]
at 
org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1(TaskContextImpl.scala:124)
 ~[spark-core_2.12-3.1.1.jar:3.1.1]
at 
org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1$adapted(TaskContextImpl.scala:124)
 ~[spark-core_2.12-3.1.1.jar:3.1.1]
at 
org.apache.spark.TaskContextImpl.$anonfun$invokeListeners$1(TaskContextImpl.scala:137)
 ~[spark-core_2.12-3.1.1.jar:3.1.1]
at 
org.apache.spark.TaskContextImpl.$anonfun$invokeListeners$1$adapted(TaskContextImpl.scala:135)
 ~[spark-core_2.12-3.1.1.jar:3.1.1]
at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) 
~[scala-library-2.12.10.jar:?]
at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) 
~[scala-library-2.12.10.jar:?]
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) 
~[scala-library-2.12.10.jar:?]
at 
org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:135) 
~[spark-core_2.12-3.1.1.jar:3.1.1]
at 
org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:124) 
~[spark-core_2.12-3.1.1.jar:3.1.1]
at org.apache.spark.scheduler.Task.run(Task.scala:147) 
~[spark-core_2.12-3.1.1.jar:3.1.1]
at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
 ~[spark-core_2.12-3.1.1.jar:3.1.1]
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) 
[spark-core_2.12-3.1.1.jar:3.1.1]
at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) 
[spark-core_2.12-3.1.1.jar:3.1.1]
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_282]
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_282]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_282]
{code}

A workaround, that I used was to add .coalesce(1) before calling this method.

It happens in a quite complex query and I have not been able to reproduce this 
with a simpler query

 !2021-05-03_18-34.png! 

  was:
I hit this assertion error when using dataset.observe:
{code}
java.lang.AssertionError: assertion failed
at scala.Predef$.assert(Predef.scala:208) ~[scala-library-2.12.10.jar:?]
at 
org.apache.spark.sql.execution.AggregatingAccumulator.setState(AggregatingAccumulator.scala:204)
 ~[spark-sql_2.12-3.1.1.jar:3.1.1]
at 
org.apache.spark.sql.execution.CollectMetricsExec.$anonfun$doExecute$2(CollectMetricsExec.scala:72)
 ~[spark-sql_2.12-3.1.1.jar:3.1.1]
at 
org.apache.spark.sql.execution.CollectMetricsExec.$anonfun$doExecute$2$adapted(CollectMetricsExec.scala:71)
 ~[spark-sql_2.12-3.1.1.jar:3.1.1]
at 
org.apache.spark.TaskContext$$anon$1.onTaskCompletion(TaskContext.scala:125) 
~[spark-core_2.12-3.1.1.jar:3.1.1]
at 
org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1(TaskContextImpl.scala:124)
 ~[spark-core_2.12-3.1.1.jar:3.1.1]
at 
org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1$adapted(TaskContextImpl.scala:124)
 ~[spark-core_2.12-3.1.1.jar:3.1.1]
at 
org.apache.spark.TaskContextImpl.$anonfun$invokeListeners$1(TaskContextImpl.scala:137)
 ~[spark-core_2.12-3.1.1.jar:3.1.1]
at 
org.apache.spark.TaskContextImpl.$anonfun$invokeListeners$1$adapted(TaskContextImpl.scala:135)
 ~[spark-core_2.12-3.1.1.jar:3.1.1]
at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) 
~[scala-library-2.12.10.jar:?]
at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) 
~[scala-library-2.12.10.jar:?]
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) 
~[scala-library-2.12.10.jar:?]
at 
org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:135) 
~[spark-core_2.12-3.1.1.jar:3.1.1]
at 
org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:124) 
~[spark-core_2.12-3.1.1.jar:3.1.1]
   

[jira] [Comment Edited] (SPARK-35296) Dataset.observe fails with an assertion

2021-05-03 Thread Tanel Kiis (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-35296?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17338438#comment-17338438
 ] 

Tanel Kiis edited comment on SPARK-35296 at 5/3/21, 3:58 PM:
-

I tried to change an excisting UT to reproduce this:
{code}
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala
index b3d29df1b2..0fcd31a09c 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala
@@ -246,7 +246,7 @@ class DataFrameCallbackSuite extends QueryTest
 }
 spark.listenerManager.register(listener)
 try {
-  val df = spark.range(100)
+  val df = spark.range(100).repartition(2)
 .observe(
   name = "my_event",
   min($"id").as("min_val"),
{code}

But did not hit the same exception:
{code}
[info] - get observable metrics by callback *** FAILED *** (324 milliseconds)
[info]   0 did not equal 2 (DataFrameCallbackSuite.scala:261)
[info]   org.scalatest.exceptions.TestFailedException:
[info]   at 
org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
[info]   at 
org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
[info]   at 
org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231)
[info]   at 
org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295)
[info]   at 
org.apache.spark.sql.util.DataFrameCallbackSuite.checkMetrics$1(DataFrameCallbackSuite.scala:261)
[info]   at 
org.apache.spark.sql.util.DataFrameCallbackSuite.$anonfun$new$15(DataFrameCallbackSuite.scala:270)
{code}

Although i think that this should not happen either.


was (Author: tanelk):
I tried to change an excisting UT to reproduce this:
{code}
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala
index b3d29df1b2..0fcd31a09c 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala
@@ -246,7 +246,7 @@ class DataFrameCallbackSuite extends QueryTest
 }
 spark.listenerManager.register(listener)
 try {
-  val df = spark.range(100)
+  val df = spark.range(100).repartition(2)
 .observe(
   name = "my_event",
   min($"id").as("min_val"),
{code}

But did not hit the same exception:
{code}
[info] - get observable metrics by callback *** FAILED *** (324 milliseconds)
[info]   0 did not equal 2 (DataFrameCallbackSuite.scala:261)
[info]   org.scalatest.exceptions.TestFailedException:
[info]   at 
org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
[info]   at 
org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
[info]   at 
org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231)
[info]   at 
org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295)
[info]   at 
org.apache.spark.sql.util.DataFrameCallbackSuite.checkMetrics$1(DataFrameCallbackSuite.scala:261)
[info]   at 
org.apache.spark.sql.util.DataFrameCallbackSuite.$anonfun$new$15(DataFrameCallbackSuite.scala:270)
{code}

> Dataset.observe fails with an assertion
> ---
>
> Key: SPARK-35296
> URL: https://issues.apache.org/jira/browse/SPARK-35296
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.1.1
>Reporter: Tanel Kiis
>Priority: Major
>
> I hit this assertion error when using dataset.observe:
> {code}
> java.lang.AssertionError: assertion failed
>   at scala.Predef$.assert(Predef.scala:208) ~[scala-library-2.12.10.jar:?]
>   at 
> org.apache.spark.sql.execution.AggregatingAccumulator.setState(AggregatingAccumulator.scala:204)
>  ~[spark-sql_2.12-3.1.1.jar:3.1.1]
>   at 
> org.apache.spark.sql.execution.CollectMetricsExec.$anonfun$doExecute$2(CollectMetricsExec.scala:72)
>  ~[spark-sql_2.12-3.1.1.jar:3.1.1]
>   at 
> org.apache.spark.sql.execution.CollectMetricsExec.$anonfun$doExecute$2$adapted(CollectMetricsExec.scala:71)
>  ~[spark-sql_2.12-3.1.1.jar:3.1.1]
>   at 
> org.apache.spark.TaskContext$$anon$1.onTaskCompletion(TaskContext.scala:125) 
> ~[spark-core_2.12-3.1.1.jar:3.1.1]
>   at 
> org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1(TaskContextImpl.scala:124)
>  ~[spark-core_2.12-3.1.1.jar:3.1.1]
>   at 
> org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1$adapted(TaskContextImpl.scala:124)
>  ~[spark-core_2.12-3.1.1.jar:3.1.1]
>   at 
> 

[jira] [Commented] (SPARK-35296) Dataset.observe fails with an assertion

2021-05-03 Thread Tanel Kiis (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-35296?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17338441#comment-17338441
 ] 

Tanel Kiis commented on SPARK-35296:


[~hvanhovell] The assertion in AggregatingAccumulator is added by you. Perhaps 
you have some idea on why it happens. 

> Dataset.observe fails with an assertion
> ---
>
> Key: SPARK-35296
> URL: https://issues.apache.org/jira/browse/SPARK-35296
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.1.1
>Reporter: Tanel Kiis
>Priority: Major
>
> I hit this assertion error when using dataset.observe:
> {code}
> java.lang.AssertionError: assertion failed
>   at scala.Predef$.assert(Predef.scala:208) ~[scala-library-2.12.10.jar:?]
>   at 
> org.apache.spark.sql.execution.AggregatingAccumulator.setState(AggregatingAccumulator.scala:204)
>  ~[spark-sql_2.12-3.1.1.jar:3.1.1]
>   at 
> org.apache.spark.sql.execution.CollectMetricsExec.$anonfun$doExecute$2(CollectMetricsExec.scala:72)
>  ~[spark-sql_2.12-3.1.1.jar:3.1.1]
>   at 
> org.apache.spark.sql.execution.CollectMetricsExec.$anonfun$doExecute$2$adapted(CollectMetricsExec.scala:71)
>  ~[spark-sql_2.12-3.1.1.jar:3.1.1]
>   at 
> org.apache.spark.TaskContext$$anon$1.onTaskCompletion(TaskContext.scala:125) 
> ~[spark-core_2.12-3.1.1.jar:3.1.1]
>   at 
> org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1(TaskContextImpl.scala:124)
>  ~[spark-core_2.12-3.1.1.jar:3.1.1]
>   at 
> org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1$adapted(TaskContextImpl.scala:124)
>  ~[spark-core_2.12-3.1.1.jar:3.1.1]
>   at 
> org.apache.spark.TaskContextImpl.$anonfun$invokeListeners$1(TaskContextImpl.scala:137)
>  ~[spark-core_2.12-3.1.1.jar:3.1.1]
>   at 
> org.apache.spark.TaskContextImpl.$anonfun$invokeListeners$1$adapted(TaskContextImpl.scala:135)
>  ~[spark-core_2.12-3.1.1.jar:3.1.1]
>   at 
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) 
> ~[scala-library-2.12.10.jar:?]
>   at 
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) 
> ~[scala-library-2.12.10.jar:?]
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) 
> ~[scala-library-2.12.10.jar:?]
>   at 
> org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:135) 
> ~[spark-core_2.12-3.1.1.jar:3.1.1]
>   at 
> org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:124) 
> ~[spark-core_2.12-3.1.1.jar:3.1.1]
>   at org.apache.spark.scheduler.Task.run(Task.scala:147) 
> ~[spark-core_2.12-3.1.1.jar:3.1.1]
>   at 
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
>  ~[spark-core_2.12-3.1.1.jar:3.1.1]
>   at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) 
> [spark-core_2.12-3.1.1.jar:3.1.1]
>   at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) 
> [spark-core_2.12-3.1.1.jar:3.1.1]
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  [?:1.8.0_282]
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  [?:1.8.0_282]
>   at java.lang.Thread.run(Thread.java:748) [?:1.8.0_282]
> {code}
> A workaround, that I used was to add .coalesce(1) before calling this method.
> It happens in a quite complex query and I have not been able to reproduce 
> this with a simpler query



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-35296) Dataset.observe fails with an assertion

2021-05-03 Thread Tanel Kiis (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-35296?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17338438#comment-17338438
 ] 

Tanel Kiis commented on SPARK-35296:


I tried to change an excisting UT to reproduce this:
{code}
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala
index b3d29df1b2..0fcd31a09c 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala
@@ -246,7 +246,7 @@ class DataFrameCallbackSuite extends QueryTest
 }
 spark.listenerManager.register(listener)
 try {
-  val df = spark.range(100)
+  val df = spark.range(100).repartition(2)
 .observe(
   name = "my_event",
   min($"id").as("min_val"),
{code}

But did not hit the same exception:
{code}
[info] - get observable metrics by callback *** FAILED *** (324 milliseconds)
[info]   0 did not equal 2 (DataFrameCallbackSuite.scala:261)
[info]   org.scalatest.exceptions.TestFailedException:
[info]   at 
org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
[info]   at 
org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
[info]   at 
org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231)
[info]   at 
org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295)
[info]   at 
org.apache.spark.sql.util.DataFrameCallbackSuite.checkMetrics$1(DataFrameCallbackSuite.scala:261)
[info]   at 
org.apache.spark.sql.util.DataFrameCallbackSuite.$anonfun$new$15(DataFrameCallbackSuite.scala:270)
{code}

> Dataset.observe fails with an assertion
> ---
>
> Key: SPARK-35296
> URL: https://issues.apache.org/jira/browse/SPARK-35296
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.1.1
>Reporter: Tanel Kiis
>Priority: Major
>
> I hit this assertion error when using dataset.observe:
> {code}
> java.lang.AssertionError: assertion failed
>   at scala.Predef$.assert(Predef.scala:208) ~[scala-library-2.12.10.jar:?]
>   at 
> org.apache.spark.sql.execution.AggregatingAccumulator.setState(AggregatingAccumulator.scala:204)
>  ~[spark-sql_2.12-3.1.1.jar:3.1.1]
>   at 
> org.apache.spark.sql.execution.CollectMetricsExec.$anonfun$doExecute$2(CollectMetricsExec.scala:72)
>  ~[spark-sql_2.12-3.1.1.jar:3.1.1]
>   at 
> org.apache.spark.sql.execution.CollectMetricsExec.$anonfun$doExecute$2$adapted(CollectMetricsExec.scala:71)
>  ~[spark-sql_2.12-3.1.1.jar:3.1.1]
>   at 
> org.apache.spark.TaskContext$$anon$1.onTaskCompletion(TaskContext.scala:125) 
> ~[spark-core_2.12-3.1.1.jar:3.1.1]
>   at 
> org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1(TaskContextImpl.scala:124)
>  ~[spark-core_2.12-3.1.1.jar:3.1.1]
>   at 
> org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1$adapted(TaskContextImpl.scala:124)
>  ~[spark-core_2.12-3.1.1.jar:3.1.1]
>   at 
> org.apache.spark.TaskContextImpl.$anonfun$invokeListeners$1(TaskContextImpl.scala:137)
>  ~[spark-core_2.12-3.1.1.jar:3.1.1]
>   at 
> org.apache.spark.TaskContextImpl.$anonfun$invokeListeners$1$adapted(TaskContextImpl.scala:135)
>  ~[spark-core_2.12-3.1.1.jar:3.1.1]
>   at 
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) 
> ~[scala-library-2.12.10.jar:?]
>   at 
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) 
> ~[scala-library-2.12.10.jar:?]
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) 
> ~[scala-library-2.12.10.jar:?]
>   at 
> org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:135) 
> ~[spark-core_2.12-3.1.1.jar:3.1.1]
>   at 
> org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:124) 
> ~[spark-core_2.12-3.1.1.jar:3.1.1]
>   at org.apache.spark.scheduler.Task.run(Task.scala:147) 
> ~[spark-core_2.12-3.1.1.jar:3.1.1]
>   at 
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
>  ~[spark-core_2.12-3.1.1.jar:3.1.1]
>   at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) 
> [spark-core_2.12-3.1.1.jar:3.1.1]
>   at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) 
> [spark-core_2.12-3.1.1.jar:3.1.1]
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  [?:1.8.0_282]
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  [?:1.8.0_282]
>   at java.lang.Thread.run(Thread.java:748) [?:1.8.0_282]
> {code}
> A workaround, that I used was to add .coalesce(1) before calling this method.
> It 

[jira] [Updated] (SPARK-35296) Dataset.observe fails with an assertion

2021-05-03 Thread Tanel Kiis (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-35296?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tanel Kiis updated SPARK-35296:
---
Description: 
I hit this assertion error when using dataset.observe:
{code}
java.lang.AssertionError: assertion failed
at scala.Predef$.assert(Predef.scala:208) ~[scala-library-2.12.10.jar:?]
at 
org.apache.spark.sql.execution.AggregatingAccumulator.setState(AggregatingAccumulator.scala:204)
 ~[spark-sql_2.12-3.1.1.jar:3.1.1]
at 
org.apache.spark.sql.execution.CollectMetricsExec.$anonfun$doExecute$2(CollectMetricsExec.scala:72)
 ~[spark-sql_2.12-3.1.1.jar:3.1.1]
at 
org.apache.spark.sql.execution.CollectMetricsExec.$anonfun$doExecute$2$adapted(CollectMetricsExec.scala:71)
 ~[spark-sql_2.12-3.1.1.jar:3.1.1]
at 
org.apache.spark.TaskContext$$anon$1.onTaskCompletion(TaskContext.scala:125) 
~[spark-core_2.12-3.1.1.jar:3.1.1]
at 
org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1(TaskContextImpl.scala:124)
 ~[spark-core_2.12-3.1.1.jar:3.1.1]
at 
org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1$adapted(TaskContextImpl.scala:124)
 ~[spark-core_2.12-3.1.1.jar:3.1.1]
at 
org.apache.spark.TaskContextImpl.$anonfun$invokeListeners$1(TaskContextImpl.scala:137)
 ~[spark-core_2.12-3.1.1.jar:3.1.1]
at 
org.apache.spark.TaskContextImpl.$anonfun$invokeListeners$1$adapted(TaskContextImpl.scala:135)
 ~[spark-core_2.12-3.1.1.jar:3.1.1]
at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) 
~[scala-library-2.12.10.jar:?]
at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) 
~[scala-library-2.12.10.jar:?]
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) 
~[scala-library-2.12.10.jar:?]
at 
org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:135) 
~[spark-core_2.12-3.1.1.jar:3.1.1]
at 
org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:124) 
~[spark-core_2.12-3.1.1.jar:3.1.1]
at org.apache.spark.scheduler.Task.run(Task.scala:147) 
~[spark-core_2.12-3.1.1.jar:3.1.1]
at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
 ~[spark-core_2.12-3.1.1.jar:3.1.1]
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) 
[spark-core_2.12-3.1.1.jar:3.1.1]
at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) 
[spark-core_2.12-3.1.1.jar:3.1.1]
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_282]
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_282]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_282]
{code}

A workaround, that I used was to add .coalesce(1) before calling this method.

It happens in a quite complex query and I have not been able to reproduce this 
with a simpler query

  was:
I hit this assertion error when using dataset.observe:
{code}
java.lang.AssertionError: assertion failed
at scala.Predef$.assert(Predef.scala:208) ~[scala-library-2.12.10.jar:?]
at 
org.apache.spark.sql.execution.AggregatingAccumulator.setState(AggregatingAccumulator.scala:204)
 ~[spark-sql_2.12-3.1.1.jar:3.1.1]
at 
org.apache.spark.sql.execution.CollectMetricsExec.$anonfun$doExecute$2(CollectMetricsExec.scala:72)
 ~[spark-sql_2.12-3.1.1.jar:3.1.1]
at 
org.apache.spark.sql.execution.CollectMetricsExec.$anonfun$doExecute$2$adapted(CollectMetricsExec.scala:71)
 ~[spark-sql_2.12-3.1.1.jar:3.1.1]
at 
org.apache.spark.TaskContext$$anon$1.onTaskCompletion(TaskContext.scala:125) 
~[spark-core_2.12-3.1.1.jar:3.1.1]
at 
org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1(TaskContextImpl.scala:124)
 ~[spark-core_2.12-3.1.1.jar:3.1.1]
at 
org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1$adapted(TaskContextImpl.scala:124)
 ~[spark-core_2.12-3.1.1.jar:3.1.1]
at 
org.apache.spark.TaskContextImpl.$anonfun$invokeListeners$1(TaskContextImpl.scala:137)
 ~[spark-core_2.12-3.1.1.jar:3.1.1]
at 
org.apache.spark.TaskContextImpl.$anonfun$invokeListeners$1$adapted(TaskContextImpl.scala:135)
 ~[spark-core_2.12-3.1.1.jar:3.1.1]
at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) 
~[scala-library-2.12.10.jar:?]
at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) 
~[scala-library-2.12.10.jar:?]
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) 
~[scala-library-2.12.10.jar:?]
at 
org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:135) 
~[spark-core_2.12-3.1.1.jar:3.1.1]
at 
org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:124) 
~[spark-core_2.12-3.1.1.jar:3.1.1]
at 

[jira] [Updated] (SPARK-35296) Dataset.observe fails with an assertion

2021-05-03 Thread Tanel Kiis (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-35296?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tanel Kiis updated SPARK-35296:
---
Description: 
I hit this assertion error when using dataset.observe:
{code}
java.lang.AssertionError: assertion failed
at scala.Predef$.assert(Predef.scala:208) ~[scala-library-2.12.10.jar:?]
at 
org.apache.spark.sql.execution.AggregatingAccumulator.setState(AggregatingAccumulator.scala:204)
 ~[spark-sql_2.12-3.1.1.jar:3.1.1]
at 
org.apache.spark.sql.execution.CollectMetricsExec.$anonfun$doExecute$2(CollectMetricsExec.scala:72)
 ~[spark-sql_2.12-3.1.1.jar:3.1.1]
at 
org.apache.spark.sql.execution.CollectMetricsExec.$anonfun$doExecute$2$adapted(CollectMetricsExec.scala:71)
 ~[spark-sql_2.12-3.1.1.jar:3.1.1]
at 
org.apache.spark.TaskContext$$anon$1.onTaskCompletion(TaskContext.scala:125) 
~[spark-core_2.12-3.1.1.jar:3.1.1]
at 
org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1(TaskContextImpl.scala:124)
 ~[spark-core_2.12-3.1.1.jar:3.1.1]
at 
org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1$adapted(TaskContextImpl.scala:124)
 ~[spark-core_2.12-3.1.1.jar:3.1.1]
at 
org.apache.spark.TaskContextImpl.$anonfun$invokeListeners$1(TaskContextImpl.scala:137)
 ~[spark-core_2.12-3.1.1.jar:3.1.1]
at 
org.apache.spark.TaskContextImpl.$anonfun$invokeListeners$1$adapted(TaskContextImpl.scala:135)
 ~[spark-core_2.12-3.1.1.jar:3.1.1]
at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) 
~[scala-library-2.12.10.jar:?]
at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) 
~[scala-library-2.12.10.jar:?]
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) 
~[scala-library-2.12.10.jar:?]
at 
org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:135) 
~[spark-core_2.12-3.1.1.jar:3.1.1]
at 
org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:124) 
~[spark-core_2.12-3.1.1.jar:3.1.1]
at org.apache.spark.scheduler.Task.run(Task.scala:147) 
~[spark-core_2.12-3.1.1.jar:3.1.1]
at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
 ~[spark-core_2.12-3.1.1.jar:3.1.1]
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) 
[spark-core_2.12-3.1.1.jar:3.1.1]
at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) 
[spark-core_2.12-3.1.1.jar:3.1.1]
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_282]
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_282]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_282]
{code}

A workouround, that I used wis to add .coalesce(1) before calling this method.

  was:
I hit this assertion error when using dataset.observe:
{code}
{code}


> Dataset.observe fails with an assertion
> ---
>
> Key: SPARK-35296
> URL: https://issues.apache.org/jira/browse/SPARK-35296
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.1.1
>Reporter: Tanel Kiis
>Priority: Major
>
> I hit this assertion error when using dataset.observe:
> {code}
> java.lang.AssertionError: assertion failed
>   at scala.Predef$.assert(Predef.scala:208) ~[scala-library-2.12.10.jar:?]
>   at 
> org.apache.spark.sql.execution.AggregatingAccumulator.setState(AggregatingAccumulator.scala:204)
>  ~[spark-sql_2.12-3.1.1.jar:3.1.1]
>   at 
> org.apache.spark.sql.execution.CollectMetricsExec.$anonfun$doExecute$2(CollectMetricsExec.scala:72)
>  ~[spark-sql_2.12-3.1.1.jar:3.1.1]
>   at 
> org.apache.spark.sql.execution.CollectMetricsExec.$anonfun$doExecute$2$adapted(CollectMetricsExec.scala:71)
>  ~[spark-sql_2.12-3.1.1.jar:3.1.1]
>   at 
> org.apache.spark.TaskContext$$anon$1.onTaskCompletion(TaskContext.scala:125) 
> ~[spark-core_2.12-3.1.1.jar:3.1.1]
>   at 
> org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1(TaskContextImpl.scala:124)
>  ~[spark-core_2.12-3.1.1.jar:3.1.1]
>   at 
> org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1$adapted(TaskContextImpl.scala:124)
>  ~[spark-core_2.12-3.1.1.jar:3.1.1]
>   at 
> org.apache.spark.TaskContextImpl.$anonfun$invokeListeners$1(TaskContextImpl.scala:137)
>  ~[spark-core_2.12-3.1.1.jar:3.1.1]
>   at 
> org.apache.spark.TaskContextImpl.$anonfun$invokeListeners$1$adapted(TaskContextImpl.scala:135)
>  ~[spark-core_2.12-3.1.1.jar:3.1.1]
>   at 
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) 
> ~[scala-library-2.12.10.jar:?]
>   at 
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) 
> ~[scala-library-2.12.10.jar:?]
> 

[jira] [Updated] (SPARK-35296) Dataset.observe fails with an assertion

2021-05-03 Thread Tanel Kiis (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-35296?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tanel Kiis updated SPARK-35296:
---
Description: 
I hit this assertion error when using dataset.observe:
{code}
{code}

  was:
I hit this assertion error when using dataset.observe:
{code}
java.lang.AssertionError: assertion failed
at scala.Predef$.assert(Predef.scala:208) 
~[bell-hadoop-utils-container-1.1-20210503.142959-520-jar-with-dependencies.jar:1.1-SNAPSHOT]
at 
org.apache.spark.sql.execution.AggregatingAccumulator.setState(AggregatingAccumulator.scala:204)
 
~[bell-hadoop-utils-container-1.1-20210503.142959-520-jar-with-dependencies.jar:1.1-SNAPSHOT]
at 
org.apache.spark.sql.execution.CollectMetricsExec.$anonfun$doExecute$2(CollectMetricsExec.scala:72)
 
~[bell-hadoop-utils-container-1.1-20210503.142959-520-jar-with-dependencies.jar:1.1-SNAPSHOT]
at 
org.apache.spark.sql.execution.CollectMetricsExec.$anonfun$doExecute$2$adapted(CollectMetricsExec.scala:71)
 
~[bell-hadoop-utils-container-1.1-20210503.142959-520-jar-with-dependencies.jar:1.1-SNAPSHOT]
at 
org.apache.spark.TaskContext$$anon$1.onTaskCompletion(TaskContext.scala:125) 
~[bell-hadoop-utils-container-1.1-20210503.142959-520-jar-with-dependencies.jar:1.1-SNAPSHOT]
at 
org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1(TaskContextImpl.scala:124)
 
~[bell-hadoop-utils-container-1.1-20210503.142959-520-jar-with-dependencies.jar:1.1-SNAPSHOT]
at 
org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1$adapted(TaskContextImpl.scala:124)
 
~[bell-hadoop-utils-container-1.1-20210503.142959-520-jar-with-dependencies.jar:1.1-SNAPSHOT]
at 
org.apache.spark.TaskContextImpl.$anonfun$invokeListeners$1(TaskContextImpl.scala:137)
 
~[bell-hadoop-utils-container-1.1-20210503.142959-520-jar-with-dependencies.jar:1.1-SNAPSHOT]
at 
org.apache.spark.TaskContextImpl.$anonfun$invokeListeners$1$adapted(TaskContextImpl.scala:135)
 
~[bell-hadoop-utils-container-1.1-20210503.142959-520-jar-with-dependencies.jar:1.1-SNAPSHOT]
at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) 
~[bell-hadoop-utils-container-1.1-20210503.142959-520-jar-with-dependencies.jar:1.1-SNAPSHOT]
at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) 
~[bell-hadoop-utils-container-1.1-20210503.142959-520-jar-with-dependencies.jar:1.1-SNAPSHOT]
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) 
~[bell-hadoop-utils-container-1.1-20210503.142959-520-jar-with-dependencies.jar:1.1-SNAPSHOT]
at 
org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:135) 
~[bell-hadoop-utils-container-1.1-20210503.142959-520-jar-with-dependencies.jar:1.1-SNAPSHOT]
at 
org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:124) 
~[bell-hadoop-utils-container-1.1-20210503.142959-520-jar-with-dependencies.jar:1.1-SNAPSHOT]
at org.apache.spark.scheduler.Task.run(Task.scala:147) 
~[bell-hadoop-utils-container-1.1-20210503.142959-520-jar-with-dependencies.jar:1.1-SNAPSHOT]
at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
 
~[bell-hadoop-utils-container-1.1-20210503.142959-520-jar-with-dependencies.jar:1.1-SNAPSHOT]
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) 
[bell-hadoop-utils-container-1.1-20210503.142959-520-jar-with-dependencies.jar:1.1-SNAPSHOT]
at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) 
[bell-hadoop-utils-container-1.1-20210503.142959-520-jar-with-dependencies.jar:1.1-SNAPSHOT]
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_162]
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_162]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_162]
{code}


> Dataset.observe fails with an assertion
> ---
>
> Key: SPARK-35296
> URL: https://issues.apache.org/jira/browse/SPARK-35296
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.1.1
>Reporter: Tanel Kiis
>Priority: Major
>
> I hit this assertion error when using dataset.observe:
> {code}
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-35296) Dataset.observe fails with an assertion

2021-05-03 Thread Tanel Kiis (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-35296?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tanel Kiis updated SPARK-35296:
---
Description: 
I hit this assertion error when using dataset.observe:
{code}
java.lang.AssertionError: assertion failed
at scala.Predef$.assert(Predef.scala:208) 
~[bell-hadoop-utils-container-1.1-20210503.142959-520-jar-with-dependencies.jar:1.1-SNAPSHOT]
at 
org.apache.spark.sql.execution.AggregatingAccumulator.setState(AggregatingAccumulator.scala:204)
 
~[bell-hadoop-utils-container-1.1-20210503.142959-520-jar-with-dependencies.jar:1.1-SNAPSHOT]
at 
org.apache.spark.sql.execution.CollectMetricsExec.$anonfun$doExecute$2(CollectMetricsExec.scala:72)
 
~[bell-hadoop-utils-container-1.1-20210503.142959-520-jar-with-dependencies.jar:1.1-SNAPSHOT]
at 
org.apache.spark.sql.execution.CollectMetricsExec.$anonfun$doExecute$2$adapted(CollectMetricsExec.scala:71)
 
~[bell-hadoop-utils-container-1.1-20210503.142959-520-jar-with-dependencies.jar:1.1-SNAPSHOT]
at 
org.apache.spark.TaskContext$$anon$1.onTaskCompletion(TaskContext.scala:125) 
~[bell-hadoop-utils-container-1.1-20210503.142959-520-jar-with-dependencies.jar:1.1-SNAPSHOT]
at 
org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1(TaskContextImpl.scala:124)
 
~[bell-hadoop-utils-container-1.1-20210503.142959-520-jar-with-dependencies.jar:1.1-SNAPSHOT]
at 
org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1$adapted(TaskContextImpl.scala:124)
 
~[bell-hadoop-utils-container-1.1-20210503.142959-520-jar-with-dependencies.jar:1.1-SNAPSHOT]
at 
org.apache.spark.TaskContextImpl.$anonfun$invokeListeners$1(TaskContextImpl.scala:137)
 
~[bell-hadoop-utils-container-1.1-20210503.142959-520-jar-with-dependencies.jar:1.1-SNAPSHOT]
at 
org.apache.spark.TaskContextImpl.$anonfun$invokeListeners$1$adapted(TaskContextImpl.scala:135)
 
~[bell-hadoop-utils-container-1.1-20210503.142959-520-jar-with-dependencies.jar:1.1-SNAPSHOT]
at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) 
~[bell-hadoop-utils-container-1.1-20210503.142959-520-jar-with-dependencies.jar:1.1-SNAPSHOT]
at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) 
~[bell-hadoop-utils-container-1.1-20210503.142959-520-jar-with-dependencies.jar:1.1-SNAPSHOT]
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) 
~[bell-hadoop-utils-container-1.1-20210503.142959-520-jar-with-dependencies.jar:1.1-SNAPSHOT]
at 
org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:135) 
~[bell-hadoop-utils-container-1.1-20210503.142959-520-jar-with-dependencies.jar:1.1-SNAPSHOT]
at 
org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:124) 
~[bell-hadoop-utils-container-1.1-20210503.142959-520-jar-with-dependencies.jar:1.1-SNAPSHOT]
at org.apache.spark.scheduler.Task.run(Task.scala:147) 
~[bell-hadoop-utils-container-1.1-20210503.142959-520-jar-with-dependencies.jar:1.1-SNAPSHOT]
at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
 
~[bell-hadoop-utils-container-1.1-20210503.142959-520-jar-with-dependencies.jar:1.1-SNAPSHOT]
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) 
[bell-hadoop-utils-container-1.1-20210503.142959-520-jar-with-dependencies.jar:1.1-SNAPSHOT]
at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) 
[bell-hadoop-utils-container-1.1-20210503.142959-520-jar-with-dependencies.jar:1.1-SNAPSHOT]
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_162]
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_162]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_162]
{code}

> Dataset.observe fails with an assertion
> ---
>
> Key: SPARK-35296
> URL: https://issues.apache.org/jira/browse/SPARK-35296
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.1.1
>Reporter: Tanel Kiis
>Priority: Major
>
> I hit this assertion error when using dataset.observe:
> {code}
> java.lang.AssertionError: assertion failed
> at scala.Predef$.assert(Predef.scala:208) 
> ~[bell-hadoop-utils-container-1.1-20210503.142959-520-jar-with-dependencies.jar:1.1-SNAPSHOT]
> at 
> org.apache.spark.sql.execution.AggregatingAccumulator.setState(AggregatingAccumulator.scala:204)
>  
> ~[bell-hadoop-utils-container-1.1-20210503.142959-520-jar-with-dependencies.jar:1.1-SNAPSHOT]
> at 
> org.apache.spark.sql.execution.CollectMetricsExec.$anonfun$doExecute$2(CollectMetricsExec.scala:72)
>  
> 

[jira] [Created] (SPARK-35296) Dataset.observe fails with an assertion

2021-05-03 Thread Tanel Kiis (Jira)
Tanel Kiis created SPARK-35296:
--

 Summary: Dataset.observe fails with an assertion
 Key: SPARK-35296
 URL: https://issues.apache.org/jira/browse/SPARK-35296
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.1.1
Reporter: Tanel Kiis






--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-34794) Nested higher-order functions broken in DSL

2021-04-05 Thread Tanel Kiis (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-34794?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tanel Kiis updated SPARK-34794:
---
Labels: correctness  (was: Correctness)

> Nested higher-order functions broken in DSL
> ---
>
> Key: SPARK-34794
> URL: https://issues.apache.org/jira/browse/SPARK-34794
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.2.0, 3.1.1
> Environment: 3.1.1
>Reporter: Daniel Solow
>Priority: Major
>  Labels: correctness
>
> In Spark 3, if I have:
> {code:java}
> val df = Seq(
> (Seq(1,2,3), Seq("a", "b", "c"))
> ).toDF("numbers", "letters")
> {code}
> and I want to take the cross product of these two arrays, I can do the 
> following in SQL:
> {code:java}
> df.selectExpr("""
> FLATTEN(
> TRANSFORM(
> numbers,
> number -> TRANSFORM(
> letters,
> letter -> (number AS number, letter AS letter)
> )
> )
> ) AS zipped
> """).show(false)
> ++
> |zipped  |
> ++
> |[{1, a}, {1, b}, {1, c}, {2, a}, {2, b}, {2, c}, {3, a}, {3, b}, {3, c}]|
> ++
> {code}
> This works fine. But when I try the equivalent using the scala DSL, the 
> result is wrong:
> {code:java}
> df.select(
> f.flatten(
> f.transform(
> $"numbers",
> (number: Column) => { f.transform(
> $"letters",
> (letter: Column) => { f.struct(
> number.as("number"),
> letter.as("letter")
> ) }
> ) }
> )
> ).as("zipped")
> ).show(10, false)
> ++
> |zipped  |
> ++
> |[{a, a}, {b, b}, {c, c}, {a, a}, {b, b}, {c, c}, {a, a}, {b, b}, {c, c}]|
> ++
> {code}
> Note that the numbers are not included in the output. The explain for this 
> second version is:
> {code:java}
> == Parsed Logical Plan ==
> 'Project [flatten(transform('numbers, lambdafunction(transform('letters, 
> lambdafunction(struct(NamePlaceholder, lambda 'x AS number#442, 
> NamePlaceholder, lambda 'x AS letter#443), lambda 'x, false)), lambda 'x, 
> false))) AS zipped#444]
> +- Project [_1#303 AS numbers#308, _2#304 AS letters#309]
>+- LocalRelation [_1#303, _2#304]
> == Analyzed Logical Plan ==
> zipped: array>
> Project [flatten(transform(numbers#308, lambdafunction(transform(letters#309, 
> lambdafunction(struct(number, lambda x#446, letter, lambda x#446), lambda 
> x#446, false)), lambda x#445, false))) AS zipped#444]
> +- Project [_1#303 AS numbers#308, _2#304 AS letters#309]
>+- LocalRelation [_1#303, _2#304]
> == Optimized Logical Plan ==
> LocalRelation [zipped#444]
> == Physical Plan ==
> LocalTableScan [zipped#444]
> {code}
> Seems like variable name x is hardcoded. And sure enough: 
> https://github.com/apache/spark/blob/v3.1.1/sql/core/src/main/scala/org/apache/spark/sql/functions.scala#L3647



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-34794) Nested higher-order functions broken in DSL

2021-04-05 Thread Tanel Kiis (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-34794?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tanel Kiis updated SPARK-34794:
---
Affects Version/s: 3.2.0

> Nested higher-order functions broken in DSL
> ---
>
> Key: SPARK-34794
> URL: https://issues.apache.org/jira/browse/SPARK-34794
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.2.0, 3.1.1
> Environment: 3.1.1
>Reporter: Daniel Solow
>Priority: Major
>
> In Spark 3, if I have:
> {code:java}
> val df = Seq(
> (Seq(1,2,3), Seq("a", "b", "c"))
> ).toDF("numbers", "letters")
> {code}
> and I want to take the cross product of these two arrays, I can do the 
> following in SQL:
> {code:java}
> df.selectExpr("""
> FLATTEN(
> TRANSFORM(
> numbers,
> number -> TRANSFORM(
> letters,
> letter -> (number AS number, letter AS letter)
> )
> )
> ) AS zipped
> """).show(false)
> ++
> |zipped  |
> ++
> |[{1, a}, {1, b}, {1, c}, {2, a}, {2, b}, {2, c}, {3, a}, {3, b}, {3, c}]|
> ++
> {code}
> This works fine. But when I try the equivalent using the scala DSL, the 
> result is wrong:
> {code:java}
> df.select(
> f.flatten(
> f.transform(
> $"numbers",
> (number: Column) => { f.transform(
> $"letters",
> (letter: Column) => { f.struct(
> number.as("number"),
> letter.as("letter")
> ) }
> ) }
> )
> ).as("zipped")
> ).show(10, false)
> ++
> |zipped  |
> ++
> |[{a, a}, {b, b}, {c, c}, {a, a}, {b, b}, {c, c}, {a, a}, {b, b}, {c, c}]|
> ++
> {code}
> Note that the numbers are not included in the output. The explain for this 
> second version is:
> {code:java}
> == Parsed Logical Plan ==
> 'Project [flatten(transform('numbers, lambdafunction(transform('letters, 
> lambdafunction(struct(NamePlaceholder, lambda 'x AS number#442, 
> NamePlaceholder, lambda 'x AS letter#443), lambda 'x, false)), lambda 'x, 
> false))) AS zipped#444]
> +- Project [_1#303 AS numbers#308, _2#304 AS letters#309]
>+- LocalRelation [_1#303, _2#304]
> == Analyzed Logical Plan ==
> zipped: array>
> Project [flatten(transform(numbers#308, lambdafunction(transform(letters#309, 
> lambdafunction(struct(number, lambda x#446, letter, lambda x#446), lambda 
> x#446, false)), lambda x#445, false))) AS zipped#444]
> +- Project [_1#303 AS numbers#308, _2#304 AS letters#309]
>+- LocalRelation [_1#303, _2#304]
> == Optimized Logical Plan ==
> LocalRelation [zipped#444]
> == Physical Plan ==
> LocalTableScan [zipped#444]
> {code}
> Seems like variable name x is hardcoded. And sure enough: 
> https://github.com/apache/spark/blob/v3.1.1/sql/core/src/main/scala/org/apache/spark/sql/functions.scala#L3647



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-34794) Nested higher-order functions broken in DSL

2021-04-05 Thread Tanel Kiis (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-34794?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tanel Kiis updated SPARK-34794:
---
Labels: Correctness  (was: )

> Nested higher-order functions broken in DSL
> ---
>
> Key: SPARK-34794
> URL: https://issues.apache.org/jira/browse/SPARK-34794
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.2.0, 3.1.1
> Environment: 3.1.1
>Reporter: Daniel Solow
>Priority: Major
>  Labels: Correctness
>
> In Spark 3, if I have:
> {code:java}
> val df = Seq(
> (Seq(1,2,3), Seq("a", "b", "c"))
> ).toDF("numbers", "letters")
> {code}
> and I want to take the cross product of these two arrays, I can do the 
> following in SQL:
> {code:java}
> df.selectExpr("""
> FLATTEN(
> TRANSFORM(
> numbers,
> number -> TRANSFORM(
> letters,
> letter -> (number AS number, letter AS letter)
> )
> )
> ) AS zipped
> """).show(false)
> ++
> |zipped  |
> ++
> |[{1, a}, {1, b}, {1, c}, {2, a}, {2, b}, {2, c}, {3, a}, {3, b}, {3, c}]|
> ++
> {code}
> This works fine. But when I try the equivalent using the scala DSL, the 
> result is wrong:
> {code:java}
> df.select(
> f.flatten(
> f.transform(
> $"numbers",
> (number: Column) => { f.transform(
> $"letters",
> (letter: Column) => { f.struct(
> number.as("number"),
> letter.as("letter")
> ) }
> ) }
> )
> ).as("zipped")
> ).show(10, false)
> ++
> |zipped  |
> ++
> |[{a, a}, {b, b}, {c, c}, {a, a}, {b, b}, {c, c}, {a, a}, {b, b}, {c, c}]|
> ++
> {code}
> Note that the numbers are not included in the output. The explain for this 
> second version is:
> {code:java}
> == Parsed Logical Plan ==
> 'Project [flatten(transform('numbers, lambdafunction(transform('letters, 
> lambdafunction(struct(NamePlaceholder, lambda 'x AS number#442, 
> NamePlaceholder, lambda 'x AS letter#443), lambda 'x, false)), lambda 'x, 
> false))) AS zipped#444]
> +- Project [_1#303 AS numbers#308, _2#304 AS letters#309]
>+- LocalRelation [_1#303, _2#304]
> == Analyzed Logical Plan ==
> zipped: array>
> Project [flatten(transform(numbers#308, lambdafunction(transform(letters#309, 
> lambdafunction(struct(number, lambda x#446, letter, lambda x#446), lambda 
> x#446, false)), lambda x#445, false))) AS zipped#444]
> +- Project [_1#303 AS numbers#308, _2#304 AS letters#309]
>+- LocalRelation [_1#303, _2#304]
> == Optimized Logical Plan ==
> LocalRelation [zipped#444]
> == Physical Plan ==
> LocalTableScan [zipped#444]
> {code}
> Seems like variable name x is hardcoded. And sure enough: 
> https://github.com/apache/spark/blob/v3.1.1/sql/core/src/main/scala/org/apache/spark/sql/functions.scala#L3647



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-34922) Use better CBO cost function

2021-03-31 Thread Tanel Kiis (Jira)
Tanel Kiis created SPARK-34922:
--

 Summary: Use better CBO cost function
 Key: SPARK-34922
 URL: https://issues.apache.org/jira/browse/SPARK-34922
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.2.0
Reporter: Tanel Kiis


In SPARK-33935 we changed the CBO cost function such that it would be symetric 
- A.betterThan(B) implies that !B.betterThan(A). Before both could have been 
true.

That change introduced a performance regression in some queries. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-34882) RewriteDistinctAggregates can cause a bug if the aggregator does not ignore NULLs

2021-03-28 Thread Tanel Kiis (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-34882?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tanel Kiis updated SPARK-34882:
---
Description: 
{code:title=group-by.sql}
SELECT
first(DISTINCT a), last(DISTINCT a),
first(a), last(a),
first(DISTINCT b), last(DISTINCT b),
first(b), last(b)
FROM testData WHERE a IS NOT NULL AND b IS NOT NULL;{code}
{code:title=group-by.sql.out}
-- !query schema
struct
-- !query output
NULL1   1   3   1   NULL1   2
{code}

The results should not be NULL, because NULL inputs are filtered out.

  was:
{code:title=group-by.sql}
SELECT first(DISTINCT a), last(DISTINCT a), first(DISTINCT b), last(DISTINCT b)
FROM testData WHERE a IS NOT NULL AND b IS NOT NULL;
{code}
{code:title=group-by.sql.out}
-- !query
SELECT first(DISTINCT  a), last(DISTINCT  a), first(DISTINCT  b), last(DISTINCT 
 b)
FROM testData WHERE a IS NOT NULL AND b IS NOT NULL
-- !query schema
struct
-- !query output
1   3   NULLNULL
{code}

The results should not be NULL, because NULL inputs are filtered out.


> RewriteDistinctAggregates can cause a bug if the aggregator does not ignore 
> NULLs
> -
>
> Key: SPARK-34882
> URL: https://issues.apache.org/jira/browse/SPARK-34882
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.2.0
>Reporter: Tanel Kiis
>Priority: Major
>
> {code:title=group-by.sql}
> SELECT
> first(DISTINCT a), last(DISTINCT a),
> first(a), last(a),
> first(DISTINCT b), last(DISTINCT b),
> first(b), last(b)
> FROM testData WHERE a IS NOT NULL AND b IS NOT NULL;{code}
> {code:title=group-by.sql.out}
> -- !query schema
> struct a):int,first(a):int,last(a):int,first(DISTINCT b):int,last(DISTINCT 
> b):int,first(b):int,last(b):int>
> -- !query output
> NULL  1   1   3   1   NULL1   2
> {code}
> The results should not be NULL, because NULL inputs are filtered out.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-34882) RewriteDistinctAggregates can cause a bug if the aggregator does not ignore NULLs

2021-03-28 Thread Tanel Kiis (Jira)
Tanel Kiis created SPARK-34882:
--

 Summary: RewriteDistinctAggregates can cause a bug if the 
aggregator does not ignore NULLs
 Key: SPARK-34882
 URL: https://issues.apache.org/jira/browse/SPARK-34882
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.2.0
Reporter: Tanel Kiis


{code:title=group-by.sql}
SELECT first(DISTINCT a), last(DISTINCT a), first(DISTINCT b), last(DISTINCT b)
FROM testData WHERE a IS NOT NULL AND b IS NOT NULL;
{code}
{code:title=group-by.sql.out}
-- !query
SELECT first(DISTINCT  a), last(DISTINCT  a), first(DISTINCT  b), last(DISTINCT 
 b)
FROM testData WHERE a IS NOT NULL AND b IS NOT NULL
-- !query schema
struct
-- !query output
1   3   NULLNULL
{code}

The results should not be NULL, because NULL inputs are filtered out.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-34876) Non-nullable aggregates can return NULL in a correlated subquery

2021-03-26 Thread Tanel Kiis (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-34876?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tanel Kiis updated SPARK-34876:
---
Affects Version/s: 2.4.7
   3.0.2
   3.1.1

> Non-nullable aggregates can return NULL in a correlated subquery
> 
>
> Key: SPARK-34876
> URL: https://issues.apache.org/jira/browse/SPARK-34876
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.7, 3.0.2, 3.2.0, 3.1.1
>Reporter: Tanel Kiis
>Priority: Major
>
> Test case in scalar-subquery-select.sql:
> {code:title=query}
> SELECT t1a,
> (SELECT count(t2d) FROM t2 WHERE t2a = t1a) count_t2,
> (SELECT count_if(t2d > 0) FROM t2 WHERE t2a = t1a) count_if_t2,
> (SELECT approx_count_distinct(t2d) FROM t2 WHERE t2a = t1a) 
> approx_count_distinct_t2,
> (SELECT collect_list(t2d) FROM t2 WHERE t2a = t1a) collect_list_t2,
> (SELECT collect_set(t2d) FROM t2 WHERE t2a = t1a) collect_set_t2,
> (SELECT hex(count_min_sketch(t2d, 0.5d, 0.5d, 1)) FROM t2 WHERE t2a = 
> t1a) collect_set_t2
> FROM t1;
> {code}
> {code:title=Result}
> val1a 0   0   NULLNULLNULLNULL
> val1a 0   0   NULLNULLNULLNULL
> val1a 0   0   NULLNULLNULLNULL
> val1a 0   0   NULLNULLNULLNULL
> val1b 6   6   3   [19,119,319,19,19,19]   [19,119,319]
> 00010006000100045D8D6AB9000400010001
> val1c 2   2   2   [219,19][219,19]
> 00010002000100045D8D6AB900010001
> val1d 0   0   NULLNULLNULLNULL
> val1d 0   0   NULLNULLNULLNULL
> val1d 0   0   NULLNULLNULLNULL
> val1e 1   1   1   [19][19]
> 00010001000100045D8D6AB90001
> val1e 1   1   1   [19][19]
> 00010001000100045D8D6AB90001
> val1e 1   1   1   [19][19]
> 00010001000100045D8D6AB90001
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-34876) Non-nullable aggregates can return NULL in a correlated subquery

2021-03-26 Thread Tanel Kiis (Jira)
Tanel Kiis created SPARK-34876:
--

 Summary: Non-nullable aggregates can return NULL in a correlated 
subquery
 Key: SPARK-34876
 URL: https://issues.apache.org/jira/browse/SPARK-34876
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.2.0
Reporter: Tanel Kiis


{code:title=query}
SELECT t1a,
(SELECT count(t2d) FROM t2 WHERE t2a = t1a) count_t2,
(SELECT count_if(t2d > 0) FROM t2 WHERE t2a = t1a) count_if_t2,
(SELECT approx_count_distinct(t2d) FROM t2 WHERE t2a = t1a) 
approx_count_distinct_t2,
(SELECT collect_list(t2d) FROM t2 WHERE t2a = t1a) collect_list_t2,
(SELECT collect_set(t2d) FROM t2 WHERE t2a = t1a) collect_set_t2,
(SELECT hex(count_min_sketch(t2d, 0.5d, 0.5d, 1)) FROM t2 WHERE t2a = t1a) 
collect_set_t2
FROM t1;
{code}

{code:title=Result}
val1a   0   0   NULLNULLNULLNULL
val1a   0   0   NULLNULLNULLNULL
val1a   0   0   NULLNULLNULLNULL
val1a   0   0   NULLNULLNULLNULL
val1b   6   6   3   [19,119,319,19,19,19]   [19,119,319]
00010006000100045D8D6AB9000400010001
val1c   2   2   2   [219,19][219,19]
00010002000100045D8D6AB900010001
val1d   0   0   NULLNULLNULLNULL
val1d   0   0   NULLNULLNULLNULL
val1d   0   0   NULLNULLNULLNULL
val1e   1   1   1   [19][19]
00010001000100045D8D6AB90001
val1e   1   1   1   [19][19]
00010001000100045D8D6AB90001
val1e   1   1   1   [19][19]
00010001000100045D8D6AB90001
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-34876) Non-nullable aggregates can return NULL in a correlated subquery

2021-03-26 Thread Tanel Kiis (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-34876?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tanel Kiis updated SPARK-34876:
---
Description: 

Test case in scalar-subquery-select.sql:

{code:title=query}
SELECT t1a,
(SELECT count(t2d) FROM t2 WHERE t2a = t1a) count_t2,
(SELECT count_if(t2d > 0) FROM t2 WHERE t2a = t1a) count_if_t2,
(SELECT approx_count_distinct(t2d) FROM t2 WHERE t2a = t1a) 
approx_count_distinct_t2,
(SELECT collect_list(t2d) FROM t2 WHERE t2a = t1a) collect_list_t2,
(SELECT collect_set(t2d) FROM t2 WHERE t2a = t1a) collect_set_t2,
(SELECT hex(count_min_sketch(t2d, 0.5d, 0.5d, 1)) FROM t2 WHERE t2a = t1a) 
collect_set_t2
FROM t1;
{code}

{code:title=Result}
val1a   0   0   NULLNULLNULLNULL
val1a   0   0   NULLNULLNULLNULL
val1a   0   0   NULLNULLNULLNULL
val1a   0   0   NULLNULLNULLNULL
val1b   6   6   3   [19,119,319,19,19,19]   [19,119,319]
00010006000100045D8D6AB9000400010001
val1c   2   2   2   [219,19][219,19]
00010002000100045D8D6AB900010001
val1d   0   0   NULLNULLNULLNULL
val1d   0   0   NULLNULLNULLNULL
val1d   0   0   NULLNULLNULLNULL
val1e   1   1   1   [19][19]
00010001000100045D8D6AB90001
val1e   1   1   1   [19][19]
00010001000100045D8D6AB90001
val1e   1   1   1   [19][19]
00010001000100045D8D6AB90001
{code}

  was:
{code:title=query}
SELECT t1a,
(SELECT count(t2d) FROM t2 WHERE t2a = t1a) count_t2,
(SELECT count_if(t2d > 0) FROM t2 WHERE t2a = t1a) count_if_t2,
(SELECT approx_count_distinct(t2d) FROM t2 WHERE t2a = t1a) 
approx_count_distinct_t2,
(SELECT collect_list(t2d) FROM t2 WHERE t2a = t1a) collect_list_t2,
(SELECT collect_set(t2d) FROM t2 WHERE t2a = t1a) collect_set_t2,
(SELECT hex(count_min_sketch(t2d, 0.5d, 0.5d, 1)) FROM t2 WHERE t2a = t1a) 
collect_set_t2
FROM t1;
{code}

{code:title=Result}
val1a   0   0   NULLNULLNULLNULL
val1a   0   0   NULLNULLNULLNULL
val1a   0   0   NULLNULLNULLNULL
val1a   0   0   NULLNULLNULLNULL
val1b   6   6   3   [19,119,319,19,19,19]   [19,119,319]
00010006000100045D8D6AB9000400010001
val1c   2   2   2   [219,19][219,19]
00010002000100045D8D6AB900010001
val1d   0   0   NULLNULLNULLNULL
val1d   0   0   NULLNULLNULLNULL
val1d   0   0   NULLNULLNULLNULL
val1e   1   1   1   [19][19]
00010001000100045D8D6AB90001
val1e   1   1   1   [19][19]
00010001000100045D8D6AB90001
val1e   1   1   1   [19][19]
00010001000100045D8D6AB90001
{code}


> Non-nullable aggregates can return NULL in a correlated subquery
> 
>
> Key: SPARK-34876
> URL: https://issues.apache.org/jira/browse/SPARK-34876
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.2.0
>Reporter: Tanel Kiis
>Priority: Major
>
> Test case in scalar-subquery-select.sql:
> {code:title=query}
> SELECT t1a,
> (SELECT count(t2d) FROM t2 WHERE t2a = t1a) count_t2,
> (SELECT count_if(t2d > 0) FROM t2 WHERE t2a = t1a) count_if_t2,
> (SELECT approx_count_distinct(t2d) FROM t2 WHERE t2a = t1a) 
> approx_count_distinct_t2,
> (SELECT collect_list(t2d) FROM t2 WHERE t2a = t1a) collect_list_t2,
> (SELECT collect_set(t2d) FROM t2 WHERE t2a = t1a) collect_set_t2,
> (SELECT hex(count_min_sketch(t2d, 0.5d, 0.5d, 1)) FROM t2 WHERE t2a = 
> t1a) collect_set_t2
> FROM t1;
> {code}
> {code:title=Result}
> val1a 0   0   NULLNULLNULLNULL
> val1a 0   0   NULLNULLNULLNULL
> val1a 0   0   NULLNULL 

[jira] [Created] (SPARK-34822) Update plan stability golden files even if only explain differs

2021-03-22 Thread Tanel Kiis (Jira)
Tanel Kiis created SPARK-34822:
--

 Summary: Update plan stability golden files even if only explain 
differs
 Key: SPARK-34822
 URL: https://issues.apache.org/jira/browse/SPARK-34822
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.2.0
Reporter: Tanel Kiis


PlanStabilitySuite updates the golden files only if simplified.txt has changed. 
In some situations only explain.txt will change and the golden files are not 
updated.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-34812) RowNumberLike and RankLike should not be nullable

2021-03-21 Thread Tanel Kiis (Jira)
Tanel Kiis created SPARK-34812:
--

 Summary: RowNumberLike and RankLike should not be nullable
 Key: SPARK-34812
 URL: https://issues.apache.org/jira/browse/SPARK-34812
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.2.0
Reporter: Tanel Kiis






--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-34623) Deduplicate window expressions

2021-03-06 Thread Tanel Kiis (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-34623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17296767#comment-17296767
 ] 

Tanel Kiis commented on SPARK-34623:


I had typo in the PR title, so it did not link up:
https://github.com/apache/spark/pull/31740

> Deduplicate window expressions
> --
>
> Key: SPARK-34623
> URL: https://issues.apache.org/jira/browse/SPARK-34623
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.2.0
>Reporter: Tanel Kiis
>Priority: Major
>
> Remove duplicate window expressions from the Window node



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-34565) Collapse Window nodes with Project between them

2021-03-06 Thread Tanel Kiis (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-34565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tanel Kiis updated SPARK-34565:
---
Description: The CollapseWindow optimizer rule can be improved to also 
collapse Window nodes, that have a Project between them. This sort of Window - 
Project - Window chains will happen when chaining the dataframe.withColumn 
calls.  (was: The CollapseWindow optimizer rule can be imroved to also collapse 
Window nodes, that have a Project between them. This sort of Window - Project - 
Window chains will happen when chaining the dataframe.withColumn calls.)

> Collapse Window nodes with Project between them
> ---
>
> Key: SPARK-34565
> URL: https://issues.apache.org/jira/browse/SPARK-34565
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.2.0
>Reporter: Tanel Kiis
>Priority: Major
>
> The CollapseWindow optimizer rule can be improved to also collapse Window 
> nodes, that have a Project between them. This sort of Window - Project - 
> Window chains will happen when chaining the dataframe.withColumn calls.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-34623) Deduplicate window expressions

2021-03-06 Thread Tanel Kiis (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-34623?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tanel Kiis updated SPARK-34623:
---
Issue Type: Improvement  (was: Bug)

> Deduplicate window expressions
> --
>
> Key: SPARK-34623
> URL: https://issues.apache.org/jira/browse/SPARK-34623
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.2.0
>Reporter: Tanel Kiis
>Priority: Major
>
> Remove duplicate window expressions from the Window node



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-34644) UDF returning array followed by explode calls the UDF multiple times and could return wrong results

2021-03-06 Thread Tanel Kiis (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-34644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17296556#comment-17296556
 ] 

Tanel Kiis commented on SPARK-34644:


UDF with internal state should be marked as non-deterministic with the 
`asNondeterministic()` method on it.
If the issue persists after that, then it should be considered a bug.

> UDF returning array followed by explode calls the UDF multiple times and 
> could return wrong results
> ---
>
> Key: SPARK-34644
> URL: https://issues.apache.org/jira/browse/SPARK-34644
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.1.1
>Reporter: Gavrilescu Laurentiu
>Priority: Major
>
> *Applying an UDF followed by explode calls the UDF multiple times.*
>  Using *persist* after applying the UDF mitigates the problem  
>  Consider the following code to reproduce it:
> {code:java}
> object Bug {
>   def main(args: Array[String]) {
> val sparkSession: SparkSession = 
> SparkSession.builder.master("local[4]").getOrCreate()
> val invocations = sparkSession.sparkContext.longAccumulator("invocations")
> def showTiming[T](body: => T): T = {
>   val t0 = System.nanoTime()
>   invocations.reset()
>   val res = body
>   val t1 = System.nanoTime()
>   println(s"invocations=${invocations.value}, time=${(t1 - t0) / 1e9}")
>   res
> }
> def expensive(n: Int) = {
>   Thread.sleep(100)
>   invocations.add(1)
>   1
> }
> val expensiveUdf = udf((x: Int) => (1 to 10) map { _ => expensive(x) })
> val df = sparkSession.range(10).toDF()
> showTiming(df
>   .select(explode(expensiveUdf(col("id"
>   .select(sum(col("col")))
>   .show())
> showTiming(df.select(expensiveUdf(col("id")).as("values"))
>   .persist()
>   .select(explode(col("values")))
>   .select(sum("col"))
>   .show())
>   }
> }
> {code}
>  =>
> {code:java}
> first:  invocations=300, time=11.342076635
> second: invocations=100, time=3.351682967{code}
> This can have undesired behavior and can return wrong results if a managed 
> state is used inside the UDF.
>  Imagine having the following scenario:
> 1. you have a dataframe with some string columns
>  2. you have an expensive function that creates a score based on some string 
> input
>  3. you want to get all the distinct values from all the columns and their 
> score - there is an executor level cache that holds the score values for 
> strings to minimize the execution of the expensive function
> consider the following code to reproduce it:
> {code:java}
> case class RowWithStrings(c1: String, c2: String, c3: String)
> case class ValueScore(value: String, score: Double)
> object Bug {
>   val columns: List[String] = List("c1", "c2", "c3")
>   def score(input: String): Double = {
> // insert expensive function here
> input.toDouble
>   }
>   def main(args: Array[String]) {
> lazy val sparkSession: SparkSession = {
>   val sparkSession = SparkSession.builder.master("local[4]")
> .getOrCreate()
>   sparkSession
> }
> // some cache over expensive operation
> val cache: TrieMap[String, Double] = TrieMap[String, Double]()
> // get scores for all columns in the row
> val body = (row: Row) => {
>   val arr = ArrayBuffer[ValueScore]()
>   columns foreach {
> column =>
>   val value = row.getAs[String](column)
>   if (!cache.contains(value)) {
> val computedScore = score(value)
> arr += ValueScore(value, computedScore)
> cache(value) = computedScore
>   }
>   }
>   arr
> }
> val basicUdf = udf(body)
> val values = (1 to 5) map {
>   idx =>
> // repeated values
> RowWithStrings(idx.toString, idx.toString, idx.toString)
> }
> import sparkSession.implicits._
> val df = values.toDF("c1", "c2", "c3").persist()
> val allCols = df.columns.map(col)
> df.select(basicUdf(struct(allCols: _*)).as("valuesScore"))
>   .select(explode(col("valuesScore")))
>   .distinct()
>   .show()
>   }
> }
> {code}
>  this shows:
> {code:java}
> +---+
> |col|
> +---+
> +---+
> {code}
> When adding persist before explode, the result is correct:
> {code:java}
> df.select(basicUdf(struct(allCols: _*)).as("valuesScore"))
>   .persist()
>   .select(explode(col("valuesScore")))
>   .distinct()
>   .show()
> {code}
> =>
> {code:java}
> ++
> | col|
> ++
> |{2, 2.0}|
> |{4, 4.0}|
> |{3, 3.0}|
> |{5, 5.0}|
> |{1, 1.0}|
> ++
> {code}
> This is not reproducible using 3.0.2 version.  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To 

[jira] [Updated] (SPARK-34623) Deduplicate window expressions

2021-03-04 Thread Tanel Kiis (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-34623?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tanel Kiis updated SPARK-34623:
---
Description: Remove duplicate window expressions from the Window node  
(was: Remove duplicate window expressions in the Window node)

> Deduplicate window expressions
> --
>
> Key: SPARK-34623
> URL: https://issues.apache.org/jira/browse/SPARK-34623
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.2.0
>Reporter: Tanel Kiis
>Priority: Major
>
> Remove duplicate window expressions from the Window node



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-34623) Deduplicate window expressions

2021-03-04 Thread Tanel Kiis (Jira)
Tanel Kiis created SPARK-34623:
--

 Summary: Deduplicate window expressions
 Key: SPARK-34623
 URL: https://issues.apache.org/jira/browse/SPARK-34623
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.2.0
Reporter: Tanel Kiis


Remove duplicate window expressions in the Window node



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-34565) Collapse Window nodes with Project between them

2021-02-27 Thread Tanel Kiis (Jira)
Tanel Kiis created SPARK-34565:
--

 Summary: Collapse Window nodes with Project between them
 Key: SPARK-34565
 URL: https://issues.apache.org/jira/browse/SPARK-34565
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.2.0
Reporter: Tanel Kiis


The CollapseWindow optimizer rule can be imroved to also collapse Window nodes, 
that have a Project between them. This sort of Window - Project - Window chains 
will happen when chaining the dataframe.withColumn calls.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-34141) ExtractGenerator analyzer should handle lazy projectlists

2021-01-16 Thread Tanel Kiis (Jira)
Tanel Kiis created SPARK-34141:
--

 Summary: ExtractGenerator analyzer should handle lazy projectlists
 Key: SPARK-34141
 URL: https://issues.apache.org/jira/browse/SPARK-34141
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.2.0
Reporter: Tanel Kiis


With the dataframe api it is possible to have a lazy sequence as the output 
field on a LogicalPlan class. When exploding a column on this dataframe using 
the withColumn method, the ExtractGenerator does not extract the generator.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-34014) Ignore Distinct if it is the right child of a left semi or anti join

2021-01-06 Thread Tanel Kiis (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-34014?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tanel Kiis resolved SPARK-34014.

Resolution: Won't Fix

Can cause performance regression

> Ignore Distinct if it is the right child of a left semi or anti join
> 
>
> Key: SPARK-34014
> URL: https://issues.apache.org/jira/browse/SPARK-34014
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.2.0
>Reporter: Tanel Kiis
>Priority: Major
>
> Left semi and anti joins ignore duplicates in the right child. Finding 
> distinct values there will only add overhead.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-34014) Ignore Distinct if it is the right child of a left semi or anti join

2021-01-05 Thread Tanel Kiis (Jira)
Tanel Kiis created SPARK-34014:
--

 Summary: Ignore Distinct if it is the right child of a left semi 
or anti join
 Key: SPARK-34014
 URL: https://issues.apache.org/jira/browse/SPARK-34014
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.2.0
Reporter: Tanel Kiis


Left semi and anti joins ignore duplicates in the right child. Finding distinct 
values there will only add overhead.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-33971) Eliminate distinct from more aggregates

2021-01-03 Thread Tanel Kiis (Jira)
Tanel Kiis created SPARK-33971:
--

 Summary: Eliminate distinct from more aggregates
 Key: SPARK-33971
 URL: https://issues.apache.org/jira/browse/SPARK-33971
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.2.0
Reporter: Tanel Kiis


The EliminateDistinct rule removes distinct from min and max aggregates.
It could remove it from several more aggregates:
* BitAndAgg
* BitOrAgg
* First
* Last
* HyperLogLogPlusPlus
* CollectSet



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-33964) Combine distinct unions in more cases

2021-01-02 Thread Tanel Kiis (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-33964?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tanel Kiis updated SPARK-33964:
---
Description: 
In several TPCDS queries the CombineUnions rule does not manage to combine 
unions, because they have noop Projects between them.
The Projects will be removed by RemoveNoopOperators, but by then 
ReplaceDistinctWithAggregate has been applied and there are aggregates between 
the unions.

> Combine distinct unions in more cases
> -
>
> Key: SPARK-33964
> URL: https://issues.apache.org/jira/browse/SPARK-33964
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.2.0
>Reporter: Tanel Kiis
>Priority: Major
>
> In several TPCDS queries the CombineUnions rule does not manage to combine 
> unions, because they have noop Projects between them.
> The Projects will be removed by RemoveNoopOperators, but by then 
> ReplaceDistinctWithAggregate has been applied and there are aggregates 
> between the unions.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-33964) Combine distinct unions in more cases

2021-01-02 Thread Tanel Kiis (Jira)
Tanel Kiis created SPARK-33964:
--

 Summary: Combine distinct unions in more cases
 Key: SPARK-33964
 URL: https://issues.apache.org/jira/browse/SPARK-33964
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.2.0
Reporter: Tanel Kiis






--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-33935) Fix CBOs cost function

2020-12-29 Thread Tanel Kiis (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-33935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tanel Kiis updated SPARK-33935:
---
Issue Type: Bug  (was: Improvement)

> Fix CBOs cost function 
> ---
>
> Key: SPARK-33935
> URL: https://issues.apache.org/jira/browse/SPARK-33935
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.2.0
>Reporter: Tanel Kiis
>Priority: Major
>
> The parameter spark.sql.cbo.joinReorder.card.weight is decumented as:
> {code:title=spark.sql.cbo.joinReorder.card.weight}
> The weight of cardinality (number of rows) for plan cost comparison in join 
> reorder: rows * weight + size * (1 - weight).
> {code}
> But in the implementation the formula is a bit different:
> {code:title=Current implementation}
> def betterThan(other: JoinPlan, conf: SQLConf): Boolean = {
>   if (other.planCost.card == 0 || other.planCost.size == 0) {
> false
>   } else {
> val relativeRows = BigDecimal(this.planCost.card) / 
> BigDecimal(other.planCost.card)
> val relativeSize = BigDecimal(this.planCost.size) / 
> BigDecimal(other.planCost.size)
> relativeRows * conf.joinReorderCardWeight +
>   relativeSize * (1 - conf.joinReorderCardWeight) < 1
>   }
> }
> {code}
> This change has an unfortunate consequence: 
> given two plans A and B, both A betterThan B and B betterThan A might give 
> the same results. This happes when one has many rows with small sizes and 
> other has few rows with large sizes.
> A example values, that have this fenomen with the default weight value (0.7):
> A.card = 500, B.card = 300
> A.size = 30, B.size = 80
> Both A betterThan B and B betterThan A would have score above 1 and would 
> return false.
> A new implementation is proposed, that matches the documentation:
> {code:title=Proposed implementation}
> def betterThan(other: JoinPlan, conf: SQLConf): Boolean = {
>   val oldCost = BigDecimal(this.planCost.card) * 
> conf.joinReorderCardWeight +
> BigDecimal(this.planCost.size) * (1 - conf.joinReorderCardWeight)
>   val newCost = BigDecimal(other.planCost.card) * 
> conf.joinReorderCardWeight +
> BigDecimal(other.planCost.size) * (1 - conf.joinReorderCardWeight)
>   newCost < oldCost
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-33935) Fix CBOs cost function

2020-12-29 Thread Tanel Kiis (Jira)
Tanel Kiis created SPARK-33935:
--

 Summary: Fix CBOs cost function 
 Key: SPARK-33935
 URL: https://issues.apache.org/jira/browse/SPARK-33935
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.2.0
Reporter: Tanel Kiis


The parameter spark.sql.cbo.joinReorder.card.weight is decumented as:
{code:title=spark.sql.cbo.joinReorder.card.weight}
The weight of cardinality (number of rows) for plan cost comparison in join 
reorder: rows * weight + size * (1 - weight).
{code}

But in the implementation the formula is a bit different:
{code:title=Current implementation}
def betterThan(other: JoinPlan, conf: SQLConf): Boolean = {
  if (other.planCost.card == 0 || other.planCost.size == 0) {
false
  } else {
val relativeRows = BigDecimal(this.planCost.card) / 
BigDecimal(other.planCost.card)
val relativeSize = BigDecimal(this.planCost.size) / 
BigDecimal(other.planCost.size)
relativeRows * conf.joinReorderCardWeight +
  relativeSize * (1 - conf.joinReorderCardWeight) < 1
  }
}
{code}

This change has an unfortunate consequence: 
given two plans A and B, both A betterThan B and B betterThan A might give the 
same results. This happes when one has many rows with small sizes and other has 
few rows with large sizes.

A example values, that have this fenomen with the default weight value (0.7):
A.card = 500, B.card = 300
A.size = 30, B.size = 80
Both A betterThan B and B betterThan A would have score above 1 and would 
return false.

A new implementation is proposed, that matches the documentation:
{code:title=Proposed implementation}
def betterThan(other: JoinPlan, conf: SQLConf): Boolean = {
  val oldCost = BigDecimal(this.planCost.card) * conf.joinReorderCardWeight 
+
BigDecimal(this.planCost.size) * (1 - conf.joinReorderCardWeight)
  val newCost = BigDecimal(other.planCost.card) * 
conf.joinReorderCardWeight +
BigDecimal(other.planCost.size) * (1 - conf.joinReorderCardWeight)
  newCost < oldCost
}
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-33070) Optimizer rules for HigherOrderFunctions

2020-12-25 Thread Tanel Kiis (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-33070?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tanel Kiis updated SPARK-33070:
---
Summary: Optimizer rules for HigherOrderFunctions  (was: Optimizer rules 
for collection datatypes and SimpleHigherOrderFunction)

> Optimizer rules for HigherOrderFunctions
> 
>
> Key: SPARK-33070
> URL: https://issues.apache.org/jira/browse/SPARK-33070
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.2.0
>Reporter: Tanel Kiis
>Priority: Minor
>
> SimpleHigherOrderFunction like ArrayTransform, ArrayFilter, etc, can be 
> combined and reordered to achieve more optimal plan.
> Possible rules:
> * Combine 2 consecutive array transforms
> * Combine 2 consecutive array filters
> * Push array filter through array sort
> * Remove array sort before array exists and array forall.
> * Combine 2 consecutive map filters



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-33070) Optimizer rules for collection datatypes and SimpleHigherOrderFunction

2020-12-25 Thread Tanel Kiis (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-33070?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tanel Kiis updated SPARK-33070:
---
Affects Version/s: (was: 3.1.0)
   3.2.0

> Optimizer rules for collection datatypes and SimpleHigherOrderFunction
> --
>
> Key: SPARK-33070
> URL: https://issues.apache.org/jira/browse/SPARK-33070
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.2.0
>Reporter: Tanel Kiis
>Priority: Minor
>
> SimpleHigherOrderFunction like ArrayTransform, ArrayFilter, etc, can be 
> combined and reordered to achieve more optimal plan.
> Possible rules:
> * Combine 2 consecutive array transforms
> * Combine 2 consecutive array filters
> * Push array filter through array sort
> * Remove array sort before array exists and array forall.
> * Combine 2 consecutive map filters



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-33851) Push partial aggregates bellow exchanges

2020-12-19 Thread Tanel Kiis (Jira)
Tanel Kiis created SPARK-33851:
--

 Summary: Push partial aggregates bellow exchanges
 Key: SPARK-33851
 URL: https://issues.apache.org/jira/browse/SPARK-33851
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.2.0
Reporter: Tanel Kiis


The `EnsureRequirements` rule inserts exchanges between partial and final 
aggregates to reduce the exchanged data amount. However, if the user manually 
partitions the data correctly, then partial aggregates will be after the 
exchange.

If possible we should push partial aggregates bellow the manually inserted 
exchanges to reduce the exchanged data amount.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-32110) -0.0 vs 0.0 is inconsistent

2020-12-10 Thread Tanel Kiis (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-32110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17247357#comment-17247357
 ] 

Tanel Kiis commented on SPARK-32110:


[~cloud_fan] fixed issue I mentioned in the first comment in this PR: 
https://github.com/apache/spark/pull/29647

> -0.0 vs 0.0 is inconsistent
> ---
>
> Key: SPARK-32110
> URL: https://issues.apache.org/jira/browse/SPARK-32110
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Robert Joseph Evans
>Assignee: Wenchen Fan
>Priority: Major
> Fix For: 3.0.2, 3.1.0
>
>
> This is related to SPARK-26021 where some things were fixed but there is 
> still a lot that is not consistent.
> When parsing SQL {{-0.0}} is turned into {{0.0}}. This can produce quick 
> results that appear to be correct but are totally inconsistent for the same 
> operators.
> {code:java}
> scala> import spark.implicits._
> import spark.implicits._
> scala> spark.sql("SELECT 0.0 = -0.0").collect
> res0: Array[org.apache.spark.sql.Row] = Array([true])
> scala> Seq((0.0, -0.0)).toDF("a", "b").selectExpr("a = b").collect
> res1: Array[org.apache.spark.sql.Row] = Array([false])
> {code}
> This also shows up in sorts
> {code:java}
> scala> Seq((0.0, -100.0), (-0.0, 100.0), (0.0, 100.0), (-0.0, 
> -100.0)).toDF("a", "b").orderBy("a", "b").collect
> res2: Array[org.apache.spark.sql.Row] = Array([-0.0,-100.0], [-0.0,100.0], 
> [0.0,-100.0], [0.0,100.0])
> {code}
> But not for a equi-join or for an aggregate
> {code:java}
> scala> Seq((0.0, -0.0)).toDF("a", "b").join(Seq((-0.0, 0.0)).toDF("r_a", 
> "r_b"), $"a" === $"r_a").collect
> res3: Array[org.apache.spark.sql.Row] = Array([0.0,-0.0,-0.0,0.0])
> scala> Seq((0.0, 1.0), (-0.0, 1.0)).toDF("a", "b").groupBy("a").count.collect
> res6: Array[org.apache.spark.sql.Row] = Array([0.0,2])
> {code}
> This can lead to some very odd results. Like an equi-join with a filter that 
> logically should do nothing, but ends up filtering the result to nothing.
> {code:java}
> scala> Seq((0.0, -0.0)).toDF("a", "b").join(Seq((-0.0, 0.0)).toDF("r_a", 
> "r_b"), $"a" === $"r_a" && $"a" <= $"r_a").collect
> res8: Array[org.apache.spark.sql.Row] = Array()
> scala> Seq((0.0, -0.0)).toDF("a", "b").join(Seq((-0.0, 0.0)).toDF("r_a", 
> "r_b"), $"a" === $"r_a").collect
> res9: Array[org.apache.spark.sql.Row] = Array([0.0,-0.0,-0.0,0.0])
> {code}
> Hive never normalizes -0.0 to 0.0 so this results in non-ieee complaint 
> behavior everywhere, but at least it is consistently odd.
> MySQL, Oracle, Postgres, and SQLite all appear to normalize the {{-0.0}} to 
> {{0.0}}.
> The root cause of this appears to be that the java implementation of 
> {{Double.compare}} and {{Float.compare}} for open JDK places {{-0.0}} < 
> {{0.0}}.
> This is not documented in the java docs but it is clearly documented in the 
> code, so it is not a "bug" that java is going to fix.
> [https://github.com/openjdk/jdk/blob/a0a0539b0d3f9b6809c9759e697bfafd7b138ec1/src/java.base/share/classes/java/lang/Double.java#L1022-L1035]
> It is also consistent with what is in the java docs for {{Double.equals}}
>  
> [https://docs.oracle.com/javase/8/docs/api/java/lang/Double.html#equals-java.lang.Object-]
> To be clear I am filing this mostly to document the current state rather than 
> to think it needs to be fixed ASAP. It is a rare corner case, but ended up 
> being really frustrating for me to debug what was happening.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-33225) Extract AliasHelper trait

2020-10-22 Thread Tanel Kiis (Jira)
Tanel Kiis created SPARK-33225:
--

 Summary: Extract AliasHelper trait
 Key: SPARK-33225
 URL: https://issues.apache.org/jira/browse/SPARK-33225
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.1.0
Reporter: Tanel Kiis


During SPARK-33122 we saw that there are several alias related methods 
duplicated between optimizers and analyzers. Do keep that PR more concise, lets 
extract AliasHelper in a separate PR.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-33177) CollectList and CollectSet should not be nullable

2020-10-18 Thread Tanel Kiis (Jira)
Tanel Kiis created SPARK-33177:
--

 Summary: CollectList and CollectSet should not be nullable
 Key: SPARK-33177
 URL: https://issues.apache.org/jira/browse/SPARK-33177
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.1.0
Reporter: Tanel Kiis


CollectList and CollectSet SQL expressions never return null value. Marking 
them as non-nullable can have some performance benefits, because some optimizer 
rules apply only to non-nullable expressions



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-33122) Remove redundant aggregates in the Optimzier

2020-10-12 Thread Tanel Kiis (Jira)
Tanel Kiis created SPARK-33122:
--

 Summary: Remove redundant aggregates in the Optimzier
 Key: SPARK-33122
 URL: https://issues.apache.org/jira/browse/SPARK-33122
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.0.1
Reporter: Tanel Kiis


It is possible to have two or more consecutive aggregates whose sole purpose is 
to keep only distinct values (for example TPCDS q87). We can remove all but the 
last one do improve performance.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-33070) Optimizer rules for collection datatypes and SimpleHigherOrderFunction

2020-10-05 Thread Tanel Kiis (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-33070?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tanel Kiis updated SPARK-33070:
---
Summary: Optimizer rules for collection datatypes and 
SimpleHigherOrderFunction  (was: Optimizer rules for SimpleHigherOrderFunction)

> Optimizer rules for collection datatypes and SimpleHigherOrderFunction
> --
>
> Key: SPARK-33070
> URL: https://issues.apache.org/jira/browse/SPARK-33070
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.1.0
>Reporter: Tanel Kiis
>Priority: Minor
>
> SimpleHigherOrderFunction like ArrayTransform, ArrayFilter, etc, can be 
> combined and reordered to achieve more optimal plan.
> Possible rules:
> * Combine 2 consecutive array transforms
> * Combine 2 consecutive array filters
> * Push array filter through array sort
> * Remove array sort before array exists and array forall.
> * Combine 2 consecutive map filters



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-33070) Optimizer rules for SimpleHigherOrderFunction

2020-10-05 Thread Tanel Kiis (Jira)
Tanel Kiis created SPARK-33070:
--

 Summary: Optimizer rules for SimpleHigherOrderFunction
 Key: SPARK-33070
 URL: https://issues.apache.org/jira/browse/SPARK-33070
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.1.0
Reporter: Tanel Kiis


SimpleHigherOrderFunction like ArrayTransform, ArrayFilter, etc, can be 
combined and reordered to achieve more optimal plan.

Possible rules:
* Combine 2 consecutive array transforms
* Combine 2 consecutive array filters
* Push array filter through array sort
* Remove array sort before array exists and array forall.
* Combine 2 consecutive map filters




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-33070) Optimizer rules for SimpleHigherOrderFunction

2020-10-05 Thread Tanel Kiis (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-33070?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tanel Kiis updated SPARK-33070:
---
Priority: Minor  (was: Major)

> Optimizer rules for SimpleHigherOrderFunction
> -
>
> Key: SPARK-33070
> URL: https://issues.apache.org/jira/browse/SPARK-33070
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.1.0
>Reporter: Tanel Kiis
>Priority: Minor
>
> SimpleHigherOrderFunction like ArrayTransform, ArrayFilter, etc, can be 
> combined and reordered to achieve more optimal plan.
> Possible rules:
> * Combine 2 consecutive array transforms
> * Combine 2 consecutive array filters
> * Push array filter through array sort
> * Remove array sort before array exists and array forall.
> * Combine 2 consecutive map filters



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-32995) CostBasedJoinReorder optimizer rule should be idempotent

2020-09-25 Thread Tanel Kiis (Jira)
Tanel Kiis created SPARK-32995:
--

 Summary: CostBasedJoinReorder optimizer rule should be idempotent
 Key: SPARK-32995
 URL: https://issues.apache.org/jira/browse/SPARK-32995
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.1.0
Reporter: Tanel Kiis


The CostBasedJoinReorder has currently the following comment:
{code}
// Since join costs in AQP can change between multiple runs, there is no 
reason that we have an
// idempotence enforcement on this batch. We thus make it FixedPoint(1) 
instead of Once.
{code} 

This is incorrect reasoning.
In the optimizer, we mean that a batch is idempotent, when applying it for the 
second time in a row (without changing anything) it does not change the plan at 
all. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-32970) Reduce the runtime of unit test for SPARK-32019

2020-09-22 Thread Tanel Kiis (Jira)
Tanel Kiis created SPARK-32970:
--

 Summary: Reduce the runtime of unit test for SPARK-32019
 Key: SPARK-32970
 URL: https://issues.apache.org/jira/browse/SPARK-32970
 Project: Spark
  Issue Type: Improvement
  Components: SQL, Tests
Affects Versions: 3.1.0
Reporter: Tanel Kiis


The UT for SPARK-32019 can run over 7 minutes on jenkins.
This sort of simple UT should run in few seconds - definitely less than a 
minute. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-32928) Non-deterministic expressions should not be reordered inside AND and OR

2020-09-21 Thread Tanel Kiis (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-32928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tanel Kiis updated SPARK-32928:
---
Labels: correctness  (was: CorrectnessBug)

> Non-deterministic expressions should not be reordered inside AND and OR
> ---
>
> Key: SPARK-32928
> URL: https://issues.apache.org/jira/browse/SPARK-32928
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.1.0
>Reporter: Tanel Kiis
>Priority: Major
>  Labels: correctness
>
> Using the splitDisjunctivePredicates and splitConjunctivePredicates helper 
> methods can change the number of times a non-deterministic expression is 
> executed. This can cause correctness issues on the client side.
> An existing test in the FilterPushdownSuite seems to exhibit this problem
> {code}
> test("generate: non-deterministic predicate referenced no generated column") {
> val originalQuery = {
>   testRelationWithArrayType
> .generate(Explode('c_arr), alias = Some("arr"))
> .where(('b >= 5) && ('a + Rand(10).as("rnd") > 6) && ('col > 6))
> }
> val optimized = Optimize.execute(originalQuery.analyze)
> val correctAnswer = {
>   testRelationWithArrayType
> .where('b >= 5)
> .generate(Explode('c_arr), alias = Some("arr"))
> .where('a + Rand(10).as("rnd") > 6 && 'col > 6)
> .analyze
> }
> comparePlans(optimized, correctAnswer)
>   }
> {code}
> In the optimized plan, the deterministic filter is moved ahead of the 
> non-deterministic one:
> {code}
> Filter ((6 < none#0) AND (cast(6 as double) < (rand(10) + cast(none#0 as 
> double
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-32928) Non-deterministic expressions should not be reordered inside AND and OR

2020-09-21 Thread Tanel Kiis (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-32928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tanel Kiis updated SPARK-32928:
---
Labels: CorrectnessBug  (was: )

> Non-deterministic expressions should not be reordered inside AND and OR
> ---
>
> Key: SPARK-32928
> URL: https://issues.apache.org/jira/browse/SPARK-32928
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.1.0
>Reporter: Tanel Kiis
>Priority: Major
>  Labels: CorrectnessBug
>
> Using the splitDisjunctivePredicates and splitConjunctivePredicates helper 
> methods can change the number of times a non-deterministic expression is 
> executed. This can cause correctness issues on the client side.
> An existing test in the FilterPushdownSuite seems to exhibit this problem
> {code}
> test("generate: non-deterministic predicate referenced no generated column") {
> val originalQuery = {
>   testRelationWithArrayType
> .generate(Explode('c_arr), alias = Some("arr"))
> .where(('b >= 5) && ('a + Rand(10).as("rnd") > 6) && ('col > 6))
> }
> val optimized = Optimize.execute(originalQuery.analyze)
> val correctAnswer = {
>   testRelationWithArrayType
> .where('b >= 5)
> .generate(Explode('c_arr), alias = Some("arr"))
> .where('a + Rand(10).as("rnd") > 6 && 'col > 6)
> .analyze
> }
> comparePlans(optimized, correctAnswer)
>   }
> {code}
> In the optimized plan, the deterministic filter is moved ahead of the 
> non-deterministic one:
> {code}
> Filter ((6 < none#0) AND (cast(6 as double) < (rand(10) + cast(none#0 as 
> double
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-32928) Non-deterministic expressions should not be reordered inside AND and OR

2020-09-21 Thread Tanel Kiis (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-32928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17199646#comment-17199646
 ] 

Tanel Kiis commented on SPARK-32928:


One more point, where this can manifest is FilterExec reordering isNotNull 
predicates

{code:title=Test SQL file}
-- Test window operator with codegen on and off.
--CONFIG_DIM1 spark.sql.codegen.wholeStage=true
--CONFIG_DIM1 
spark.sql.codegen.wholeStage=false,spark.sql.codegen.factoryMode=CODEGEN_ONLY
--CONFIG_DIM1 
spark.sql.codegen.wholeStage=false,spark.sql.codegen.factoryMode=NO_CODEGEN

CREATE OR REPLACE TEMPORARY VIEW testData AS SELECT * FROM VALUES
1, 2, 3, 4, 5, 6, 7, 8, 9, 10
AS testData(a);

SELECT a FROM testData WHERE NOT ISNULL(IF(RAND(0) > 0.5, NULL, a)) AND RAND(1) 
> 0.5;
{code}

{code:title=Generated output file}
- Automatically generated by SQLQueryTestSuite
-- Number of queries: 2


-- !query
CREATE OR REPLACE TEMPORARY VIEW testData AS SELECT * FROM VALUES
1, 2, 3, 4, 5, 6, 7, 8, 9, 10
AS testData(a)
-- !query schema
struct<>
-- !query output



-- !query
SELECT a FROM testData WHERE NOT ISNULL(IF(RAND(0) > 0.5, NULL, a)) AND RAND(1) 
> 0.5
-- !query schema
struct
-- !query output
3
4
8
{code}

{code:title=Error on running the test}
23:16:44.013 ERROR org.apache.spark.sql.SQLQueryTestSuite: Error using configs: 
spark.sql.codegen.wholeStage=false,spark.sql.codegen.factoryMode=CODEGEN_ONLY
[info] - deterministic.sql *** FAILED *** (1 second, 955 milliseconds)
[info]   deterministic.sql
[info]   Expected "3
[info]   4
[info]   8[]", but got "3
[info]   4
[info]   8[
[info]   9]" Result did not match for query #1
{code}

> Non-deterministic expressions should not be reordered inside AND and OR
> ---
>
> Key: SPARK-32928
> URL: https://issues.apache.org/jira/browse/SPARK-32928
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.1.0
>Reporter: Tanel Kiis
>Priority: Major
>
> Using the splitDisjunctivePredicates and splitConjunctivePredicates helper 
> methods can change the number of times a non-deterministic expression is 
> executed. This can cause correctness issues on the client side.
> An existing test in the FilterPushdownSuite seems to exhibit this problem
> {code}
> test("generate: non-deterministic predicate referenced no generated column") {
> val originalQuery = {
>   testRelationWithArrayType
> .generate(Explode('c_arr), alias = Some("arr"))
> .where(('b >= 5) && ('a + Rand(10).as("rnd") > 6) && ('col > 6))
> }
> val optimized = Optimize.execute(originalQuery.analyze)
> val correctAnswer = {
>   testRelationWithArrayType
> .where('b >= 5)
> .generate(Explode('c_arr), alias = Some("arr"))
> .where('a + Rand(10).as("rnd") > 6 && 'col > 6)
> .analyze
> }
> comparePlans(optimized, correctAnswer)
>   }
> {code}
> In the optimized plan, the deterministic filter is moved ahead of the 
> non-deterministic one:
> {code}
> Filter ((6 < none#0) AND (cast(6 as double) < (rand(10) + cast(none#0 as 
> double
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



  1   2   >