[ 
https://issues.apache.org/jira/browse/SPARK-49218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18010620#comment-18010620
 ] 

Avi minsky commented on SPARK-49218:
------------------------------------

[~mpayson] i was able to also reproduce this with a different scenario which 
doesn't require caching:


{code:java}
val observer = Observation.apply("observer")
val rows = new util.ArrayList[Row]()

rows.add(RowFactory.create("bbbbb"))
val ds = spark.createDataFrame(rows, new StructType().add("aField", 
DataTypes.StringType))
    .withColumn("mypart", when(col("aField").notEqual("aaaa"), 
null).otherwise("bbbb"))
    .select("mypart","aField")
    .filter(col("mypart").isNotNull)

val withObserver = ds.observe(observer, count("*").as("cnt"))
withObserver
  .repartition(col("mypart"))
  .write.mode(SaveMode.Overwrite).partitionBy("mypart")
  .parquet("data/out/observer")
println(s"output is ${observer.get}") {code}
when running this the last println halts forever since the last physical plan 
doesn't remove the Metric collector

> Caching observed dataframes blocks metric retrieval when the dataframe is 
> empty
> -------------------------------------------------------------------------------
>
>                 Key: SPARK-49218
>                 URL: https://issues.apache.org/jira/browse/SPARK-49218
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark, Spark Core
>    Affects Versions: 3.5.0, 3.5.1, 3.5.2
>         Environment: * Mac M2
>  * Python 3.11
>  * PySpark 3.5.2 running in local mode, installed via pip
>            Reporter: Max Payson
>            Priority: Major
>
> Caching observed dataframes blocks metric retrieval when the dataframe is 
> empty. This issue started in PySpark 3.5.0 and can be reproduced by running 
> the following script, which does not complete:
> {code:java}
> from pyspark.sql import SparkSession, Observation, functions as F
> spark = SparkSession.builder.getOrCreate()
> df = spark.createDataFrame([], "f: double")
> observation = Observation("count")
> observed_df = df.observe(observation, F.count(F.lit(1))) 
> observed_df.cache().collect()
> print(observation.get){code}
>  
> The issue can also be reproduced when reading a CSV with 0 records, or when 
> using additional select statements on the observed dataframe. Removing 
> `cache()` or downgrading to Spark 3.4.3 prints the expected result: 
> `"\{'count(1)': 0}"`.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to