gengliangwang commented on code in PR #55493: URL: https://github.com/apache/spark/pull/55493#discussion_r3237031193
########## docs/declarative-pipelines-programming-guide.md: ########## @@ -180,6 +180,20 @@ Your pipelines implemented with the Python API must import this module. It's rec from pyspark import pipelines as dp ``` +### The Spark Session in Python Pipelines + +The Spark session is automatically injected by the pipeline framework and is available as `spark` in every Python pipeline file — no initialization code is required. You can use `spark` directly without importing or constructing a `SparkSession`: + +```python +from pyspark import pipelines as dp + [email protected]_view +def my_view(): + return spark.range(10) +``` + +Previous versions of Declarative Pipelines required explicitly assigning the session with `spark = SparkSession.active()` at the top of each pipeline file. This is still allowed and continues to work correctly. However, if you do assign the session explicitly, `SparkSession.active()` is the only supported way to do so — any other method of obtaining or constructing a `SparkSession` is unsupported and may lead to unexpected behavior. Review Comment: Consider making the version boundary explicit. SDP shipped in 4.1, so readers on 4.1 or 4.2 are reading "previous versions" while still running a release where the explicit assignment is required. Something like: > In Spark 4.1 and 4.2, every pipeline file had to declare `spark = SparkSession.active()` explicitly. Starting in Spark 4.3, the framework injects `spark` into each pipeline file's module namespace, so the explicit assignment is no longer required. Pipeline files that still include `spark = SparkSession.active()` continue to work correctly. …would tell a 4.1/4.2 user exactly what they need to do, and tell a 4.3+ user that the boilerplate is gone. Also, the sentence "any other method of obtaining or constructing a `SparkSession` is unsupported and may lead to unexpected behavior" is vague — a one-liner on the concrete failure mode (e.g. `SparkSession.builder.config(...).getOrCreate()` calls `_apply_options`, which mutates session config and trips `block_session_mutations`) would help users understand the constraint rather than just memorize it. ########## sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/utils/APITest.scala: ########## @@ -374,6 +366,30 @@ trait APITest } /* Python Language Tests */ + test("Python Pipeline with explicit spark assignment is backward compatible") { + val pipelineSpec = + TestPipelineSpec(include = Seq("transformations/**")) + val pipelineConfig = TestPipelineConfiguration(pipelineSpec) + val sources = Seq( + PipelineSourceFile( + name = "transformations/definition.py", + contents = """ + |from pyspark import pipelines as dp + |from pyspark.sql import SparkSession + | + |spark = SparkSession.active() + | + |@dp.materialized_view + |def mv(): + | return spark.range(5) + |""".stripMargin)) + val pipeline = createAndRunPipeline(pipelineConfig, sources) + awaitPipelineTermination(pipeline) + + checkAnswer(spark.sql(s"SELECT * FROM mv"), Seq(Row(0), Row(1), Row(2), Row(3), Row(4))) + } + + Review Comment: Nit: extra blank line between this test and the next one — the rest of the file uses a single blank line between tests. ```suggestion ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
