Hi Michail,

with spark.conf.set("spark.sql.planChangeLog.level", "WARN") you can see
how Spark optimizes the query plan.

In PySpark, the plan is optimized into

Project ...
  +- CollectMetrics 2, [count(1) AS count(1)#200L]
  +- LocalTableScan <empty>, [col1#125, col2#126L, col3#127, col4#132L]

The entire join gets optimized away into an empty table. Looks like it
figures out that df has no rows with col1 = 'c'. So df is never consumed
/ iterated, so the observation does not retrieve any metrics.

In Scala, the optimization is different:

*(2) Project ...
  +- CollectMetrics 2, [count(1) AS count(1)#63L]
     +- *(1) Project [col1#37, col2#38, col3#39, cast(null as int) AS
col4#51]
        +- *(1) Filter (isnotnull(col1#37) AND (col1#37 = c))
           +- CollectMetrics 1, [count(1) AS count(1)#56L]
              +- LocalTableScan [col1#37, col2#38, col3#39]

where the join also gets optimized away, but table df is still filtered
for col1 = 'c', which iterates over the rows and collects the metrics
for observation 1.

Hope this helps to understand why there are no observed metrics for
Observation("1") in your case.

Enrico



Am 04.12.23 um 10:45 schrieb Enrico Minack:
Hi Michail,

observations as well as ordinary accumulators only observe / process
rows that are iterated / consumed by downstream stages. If the query
plan decides to skip one side of the join, that one will be removed from
the final plan completely. Then, the Observation will not retrieve any
metrics and .get waits forever. Definitively not helpful.

When creating the Observation class, we thought about a timeout for the
get method but could not find a use case where the user would call get
without first executing the query. Here is a scenario where though
executing the query there is no observation result. We will rethink this.

Interestingly, your example works in Scala:

import org.apache.spark.sql.Observation

val df = Seq(("a", 1, "1 2 3 4"), ("b", 2, "1 2 3 4")).toDF("col1",
"col2", "col3")
val df_join = Seq(("a", 6), ("b", 5)).toDF("col1", "col4")

val o1 = Observation()
val o2 = Observation()

val df1 = df.observe(o1, count("*")).filter("col1 = 'c'")
val df2 = df1.join(df_join, "col1", "left").observe(o2, count("*"))

df2.show()
+----+----+----+----+
|col1|col2|col3|col4|
+----+----+----+----+
+----+----+----+----+

o1.get
Map[String,Any] = Map(count(1) -> 2)

o2.get
Map[String,Any] = Map(count(1) -> 0)


Pyspark and Scala should behave identically here. I will investigate.

Cheers,
Enrico



Am 02.12.23 um 17:11 schrieb Михаил Кулаков:
Hey folks, I actively using observe method on my spark jobs and
noticed interesting behavior:
Here is an example of working and non working code:
https://gist.github.com/Coola4kov/8aeeb05abd39794f8362a3cf1c66519c
<https://gist.github.com/Coola4kov/8aeeb05abd39794f8362a3cf1c66519c>

In a few words, if I'm joining dataframe after some filter rules and
it became empty, observations configured on the first dataframe never
return any results, unless some action called on the empty dataframe
specifically before join.

Looks like a bug to me, I will appreciate any advice on how to fix
this behavior.



---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to