Hello Winston,

I looked into the suggested code snippet. But I am getting the following
error

```
value listenerManager is not a member of org.apache.spark.sql.SparkSession
```

Although I can see it is available in the API.

https://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/SparkSession.html

With Regards,
Vibhatha Abeykoon, PhD


On Wed, Aug 2, 2023 at 4:22 PM Vibhatha Abeykoon <vibha...@gmail.com> wrote:

> Hello Winston,
>
> Thanks again for this response, I will check this one out.
>
> On Wed, Aug 2, 2023 at 3:50 PM Winston Lai <weiruanl...@gmail.com> wrote:
>
>>
>> Hi Vibhatha,
>>
>> I helped you post this question to another community. There is one answer
>> by someone else for your reference.
>>
>> To access the logical plan or optimized plan, you can register a custom
>> QueryExecutionListener and retrieve the plans during the query execution
>> process. Here's an example of how to do it in Scala:
>>
>> > import org.apache.spark.sql.{SparkSession, QueryExecutionListener}
>> >
>> > // Create a custom QueryExecutionListener
>> > class CustomQueryExecutionListener extends QueryExecutionListener {
>> > override def onSuccess(funcName: String, qe:
>> org.apache.spark.sql.execution.QueryExecution, durationNs: Long): Unit = {
>> > // Retrieve the logical plan
>> > val logicalPlan = qe.logical
>> >
>> > // Retrieve the optimized plan
>> > val optimizedPlan = qe.optimizedPlan
>> >
>> > // Process the plans with your custom function
>> > processPlans(logicalPlan, optimizedPlan)
>> > }
>> >
>> > override def onFailure(funcName: String, qe:
>> org.apache.spark.sql.execution.QueryExecution, exception: Exception): Unit
>> = {}
>> > }
>> >
>> > // Create a SparkSession
>> > val spark = SparkSession.builder()
>> > .appName("Example")
>> > .getOrCreate()
>> >
>> > // Register the custom QueryExecutionListener
>> > spark.listenerManager.register(new CustomQueryExecutionListener)
>> >
>> > // Perform your DataFrame operations
>> > val df = spark.read.csv("path/to/file.csv")
>> > val filteredDF = df.filter(df("column") > 10)
>> > val resultDF = filteredDF.select("column1", "column2")
>> >
>> > // Trigger the execution of the DF to invoke the listener
>> > resultDF.show()
>>
>> Thank You & Best Regards
>> Winston Lai
>> ------------------------------
>> *From:* Vibhatha Abeykoon <vibha...@gmail.com>
>> *Sent:* Wednesday, August 2, 2023 5:03:15 PM
>> *To:* Ruifeng Zheng <zrfli...@gmail.com>
>> *Cc:* Winston Lai <weiruanl...@gmail.com>; user@spark.apache.org <
>> user@spark.apache.org>
>> *Subject:* Re: Extracting Logical Plan
>>
>> I understand. I sort of drew the same conclusion. But I wasn’t sure.
>> Thanks everyone for taking time on this.
>>
>> On Wed, Aug 2, 2023 at 2:29 PM Ruifeng Zheng <zrfli...@gmail.com> wrote:
>>
>> In Spark Connect, I think the only API to show optimized plan is
>> `df.explain("extended")` as Winston mentioned, but it is not a LogicalPlan
>> object.
>>
>> On Wed, Aug 2, 2023 at 4:36 PM Vibhatha Abeykoon <vibha...@gmail.com>
>> wrote:
>>
>> Hello Ruifeng,
>>
>> Thank you for these pointers. Would it be different if I use the Spark
>> connect? I am not using the regular SparkSession. I am pretty new to these
>> APIs. Appreciate your thoughts.
>>
>> On Wed, Aug 2, 2023 at 2:00 PM Ruifeng Zheng <zrfli...@gmail.com> wrote:
>>
>> Hi Vibhatha,
>>    I think those APIs are still avaiable?
>>
>>
>>
>> ```
>> Welcome to
>>       ____              __
>>      / __/__  ___ _____/ /__
>>     _\ \/ _ \/ _ `/ __/  '_/
>>    /___/ .__/\_,_/_/ /_/\_\   version 3.4.1
>>       /_/
>>
>> Using Scala version 2.12.17 (OpenJDK 64-Bit Server VM, Java 11.0.19)
>> Type in expressions to have them evaluated.
>> Type :help for more information.
>>
>> scala> val df = spark.range(0, 10)
>> df: org.apache.spark.sql.Dataset[Long] = [id: bigint]
>>
>> scala> df.queryExecution
>> res0: org.apache.spark.sql.execution.QueryExecution =
>> == Parsed Logical Plan ==
>> Range (0, 10, step=1, splits=Some(12))
>>
>> == Analyzed Logical Plan ==
>> id: bigint
>> Range (0, 10, step=1, splits=Some(12))
>>
>> == Optimized Logical Plan ==
>> Range (0, 10, step=1, splits=Some(12))
>>
>> == Physical Plan ==
>> *(1) Range (0, 10, step=1, splits=12)
>>
>> scala> df.queryExecution.optimizedPlan
>> res1: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
>> Range (0, 10, step=1, splits=Some(12))
>> ```
>>
>>
>>
>> On Wed, Aug 2, 2023 at 3:58 PM Vibhatha Abeykoon <vibha...@gmail.com>
>> wrote:
>>
>> Hi Winston,
>>
>> I need to use the LogicalPlan object and process it with another function
>> I have written. In earlier Spark versions we can access that via the
>> dataframe object. So if it can be accessed via the UI, is there an API to
>> access the object?
>>
>> On Wed, Aug 2, 2023 at 1:24 PM Winston Lai <weiruanl...@gmail.com> wrote:
>>
>> Hi Vibhatha,
>>
>> How about reading the logical plan from Spark UI, do you have access to
>> the Spark UI? I am not sure what infra you run your Spark jobs on. Usually
>> you should be able to view the logical and physical plan under Spark UI in
>> text version at least. It is independent from the language (e.g.,
>> scala/Python/R) that you use to run Spark.
>>
>>
>> On Wednesday, August 2, 2023, Vibhatha Abeykoon <vibha...@gmail.com>
>> wrote:
>>
>> Hi Winston,
>>
>> I am looking for a way to access the LogicalPlan object in Scala. Not
>> sure if explain function would serve the purpose.
>>
>> On Wed, Aug 2, 2023 at 9:14 AM Winston Lai <weiruanl...@gmail.com> wrote:
>>
>> Hi Vibhatha,
>>
>> Have you tried pyspark.sql.DataFrame.explain — PySpark 3.4.1
>> documentation (apache.org)
>> <https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.explain.html#pyspark.sql.DataFrame.explain>
>>  before?
>> I am not sure what infra that you have, you can try this first. If it
>> doesn't work, you may share more info such as what platform you are running
>> your Spark jobs on, what cloud servies you are using ...
>>
>> On Wednesday, August 2, 2023, Vibhatha Abeykoon <vibha...@gmail.com>
>> wrote:
>>
>> Hello,
>>
>> I recently upgraded the Spark version to 3.4.1 and I have encountered a
>> few issues. In my previous code, I was able to extract the logical plan
>> using `df.queryExecution` (df: DataFrame and in Scala), but it seems like
>> in the latest API it is not supported. Is there a way to extract the
>> logical plan or optimized plan from a dataframe or dataset in Spark 3.4.1?
>>
>> Best,
>> Vibhatha
>>
>> --
>> Vibhatha Abeykoon
>>
>> --
>> Vibhatha Abeykoon
>>
>>
>>
>> --
>> Ruifeng Zheng
>> E-mail: zrfli...@gmail.com
>>
>> --
>> Vibhatha Abeykoon
>>
>>
>>
>> --
>> Ruifeng Zheng
>> E-mail: zrfli...@gmail.com
>>
>> --
>> Vibhatha Abeykoon
>>
>

Reply via email to