This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 81c29eef4ad3 [SPARK-52238][SDP] Rename Pipeline Spec Field "definitions" to 'libraries' 81c29eef4ad3 is described below commit 81c29eef4ad3bc1cab187bc6bf9fe1e5ab51e7c6 Author: Jacky Wang <jacky.w...@databricks.com> AuthorDate: Wed Sep 10 16:45:55 2025 +0800 [SPARK-52238][SDP] Rename Pipeline Spec Field "definitions" to 'libraries' ### What changes were proposed in this pull request? Rename the pipeline spec field from "definitions" to "libraries". This field allows user to include pipeline source code files. ```diff name: libraries-test - definitions: + libraries: - glob: include: transformations/**/*.py - glob: include: transformations/**/*.sql ``` ### Why are the changes needed? Open up the possibility to add other types of dependencies for a pipeline execution, such as python wheels. `libraries` is a more general term. ### Does this PR introduce _any_ user-facing change? Yes, but SDP not released. ### How was this patch tested? Existing tests ### Was this patch authored or co-authored using generative AI tooling? No Closes #52294 from JiaqiWang18/rename-spec-field-libraries. Authored-by: Jacky Wang <jacky.w...@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- python/pyspark/pipelines/cli.py | 20 ++++++++-------- python/pyspark/pipelines/init_cli.py | 2 +- python/pyspark/pipelines/tests/test_cli.py | 38 +++++++++++++++--------------- 3 files changed, 30 insertions(+), 30 deletions(-) diff --git a/python/pyspark/pipelines/cli.py b/python/pyspark/pipelines/cli.py index 43f9ae150f3f..dcfda1959a2e 100644 --- a/python/pyspark/pipelines/cli.py +++ b/python/pyspark/pipelines/cli.py @@ -52,8 +52,8 @@ PIPELINE_SPEC_FILE_NAMES = ["pipeline.yaml", "pipeline.yml"] @dataclass(frozen=True) -class DefinitionsGlob: - """A glob pattern for finding pipeline definitions files.""" +class LibrariesGlob: + """A glob pattern for finding pipeline source codes.""" include: str @@ -66,14 +66,14 @@ class PipelineSpec: :param catalog: The default catalog to use for the pipeline. :param database: The default database to use for the pipeline. :param configuration: A dictionary of Spark configuration properties to set for the pipeline. - :param definitions: A list of glob patterns for finding pipeline definitions files. + :param libraries: A list of glob patterns for finding pipeline source codes. """ name: str catalog: Optional[str] database: Optional[str] configuration: Mapping[str, str] - definitions: Sequence[DefinitionsGlob] + libraries: Sequence[LibrariesGlob] def find_pipeline_spec(current_dir: Path) -> Path: @@ -113,7 +113,7 @@ def load_pipeline_spec(spec_path: Path) -> PipelineSpec: def unpack_pipeline_spec(spec_data: Mapping[str, Any]) -> PipelineSpec: - ALLOWED_FIELDS = {"name", "catalog", "database", "schema", "configuration", "definitions"} + ALLOWED_FIELDS = {"name", "catalog", "database", "schema", "configuration", "libraries"} REQUIRED_FIELDS = ["name"] for key in spec_data.keys(): if key not in ALLOWED_FIELDS: @@ -133,9 +133,9 @@ def unpack_pipeline_spec(spec_data: Mapping[str, Any]) -> PipelineSpec: catalog=spec_data.get("catalog"), database=spec_data.get("database", spec_data.get("schema")), configuration=validate_str_dict(spec_data.get("configuration", {}), "configuration"), - definitions=[ - DefinitionsGlob(include=entry["glob"]["include"]) - for entry in spec_data.get("definitions", []) + libraries=[ + LibrariesGlob(include=entry["glob"]["include"]) + for entry in spec_data.get("libraries", []) ], ) @@ -178,8 +178,8 @@ def register_definitions( with change_dir(path): with graph_element_registration_context(registry): log_with_curr_timestamp(f"Loading definitions. Root directory: '{path}'.") - for definition_glob in spec.definitions: - glob_expression = definition_glob.include + for libraries_glob in spec.libraries: + glob_expression = libraries_glob.include matching_files = [p for p in path.glob(glob_expression) if p.is_file()] log_with_curr_timestamp( f"Found {len(matching_files)} files matching glob '{glob_expression}'" diff --git a/python/pyspark/pipelines/init_cli.py b/python/pyspark/pipelines/init_cli.py index 227e5aa5deca..89b998bd4f32 100644 --- a/python/pyspark/pipelines/init_cli.py +++ b/python/pyspark/pipelines/init_cli.py @@ -19,7 +19,7 @@ from pathlib import Path SPEC = """ name: {{ name }} -definitions: +libraries: - glob: include: transformations/**/*.py - glob: diff --git a/python/pyspark/pipelines/tests/test_cli.py b/python/pyspark/pipelines/tests/test_cli.py index 8055723ddc5a..fc238fac1786 100644 --- a/python/pyspark/pipelines/tests/test_cli.py +++ b/python/pyspark/pipelines/tests/test_cli.py @@ -34,7 +34,7 @@ if should_test_connect and have_yaml: load_pipeline_spec, register_definitions, unpack_pipeline_spec, - DefinitionsGlob, + LibrariesGlob, PipelineSpec, run, ) @@ -58,7 +58,7 @@ class CLIUtilityTests(unittest.TestCase): "key1": "value1", "key2": "value2" }, - "definitions": [ + "libraries": [ {"glob": {"include": "test_include"}} ] } @@ -70,8 +70,8 @@ class CLIUtilityTests(unittest.TestCase): assert spec.catalog == "test_catalog" assert spec.database == "test_database" assert spec.configuration == {"key1": "value1", "key2": "value2"} - assert len(spec.definitions) == 1 - assert spec.definitions[0].include == "test_include" + assert len(spec.libraries) == 1 + assert spec.libraries[0].include == "test_include" def test_load_pipeline_spec_name_is_required(self): with tempfile.NamedTemporaryFile(mode="w") as tmpfile: @@ -84,7 +84,7 @@ class CLIUtilityTests(unittest.TestCase): "key1": "value1", "key2": "value2" }, - "definitions": [ + "libraries": [ {"glob": {"include": "test_include"}} ] } @@ -110,7 +110,7 @@ class CLIUtilityTests(unittest.TestCase): "key1": "value1", "key2": "value2" }, - "definitions": [ + "libraries": [ {"glob": {"include": "test_include"}} ] } @@ -121,8 +121,8 @@ class CLIUtilityTests(unittest.TestCase): assert spec.catalog == "test_catalog" assert spec.database == "test_database" assert spec.configuration == {"key1": "value1", "key2": "value2"} - assert len(spec.definitions) == 1 - assert spec.definitions[0].include == "test_include" + assert len(spec.libraries) == 1 + assert spec.libraries[0].include == "test_include" def test_load_pipeline_spec_invalid(self): with tempfile.NamedTemporaryFile(mode="w") as tmpfile: @@ -134,7 +134,7 @@ class CLIUtilityTests(unittest.TestCase): "key1": "value1", "key2": "value2" }, - "definitions": [ + "libraries": [ {"glob": {"include": "test_include"}} ] } @@ -150,7 +150,7 @@ class CLIUtilityTests(unittest.TestCase): def test_unpack_empty_pipeline_spec(self): empty_spec = PipelineSpec( - name="test_pipeline", catalog=None, database=None, configuration={}, definitions=[] + name="test_pipeline", catalog=None, database=None, configuration={}, libraries=[] ) self.assertEqual(unpack_pipeline_spec({"name": "test_pipeline"}), empty_spec) @@ -176,7 +176,7 @@ class CLIUtilityTests(unittest.TestCase): { "catalog": "test_catalog", "configuration": {}, - "definitions": [] + "libraries": [] } """ ) @@ -193,7 +193,7 @@ class CLIUtilityTests(unittest.TestCase): { "catalog": "test_catalog", "configuration": {}, - "definitions": [] + "libraries": [] } """ ) @@ -226,7 +226,7 @@ class CLIUtilityTests(unittest.TestCase): { "catalog": "test_catalog", "configuration": {}, - "definitions": [] + "libraries": [] } """ ) @@ -240,7 +240,7 @@ class CLIUtilityTests(unittest.TestCase): catalog=None, database=None, configuration={}, - definitions=[DefinitionsGlob(include="subdir1/*")], + libraries=[LibrariesGlob(include="subdir1/*")], ) with tempfile.TemporaryDirectory() as temp_dir: outer_dir = Path(temp_dir) @@ -248,7 +248,7 @@ class CLIUtilityTests(unittest.TestCase): subdir1.mkdir() subdir2 = outer_dir / "subdir2" subdir2.mkdir() - with (subdir1 / "definitions.py").open("w") as f: + with (subdir1 / "libraries.py").open("w") as f: f.write( textwrap.dedent( """ @@ -260,7 +260,7 @@ class CLIUtilityTests(unittest.TestCase): ) ) - with (subdir2 / "definitions.py").open("w") as f: + with (subdir2 / "libraries.py").open("w") as f: f.write( textwrap.dedent( """ @@ -283,7 +283,7 @@ class CLIUtilityTests(unittest.TestCase): catalog=None, database=None, configuration={}, - definitions=[DefinitionsGlob(include="*")], + libraries=[LibrariesGlob(include="*")], ) with tempfile.TemporaryDirectory() as temp_dir: outer_dir = Path(temp_dir) @@ -301,7 +301,7 @@ class CLIUtilityTests(unittest.TestCase): catalog=None, database=None, configuration={}, - definitions=[DefinitionsGlob(include="*")], + libraries=[LibrariesGlob(include="*")], ) with tempfile.TemporaryDirectory() as temp_dir: outer_dir = Path(temp_dir) @@ -355,7 +355,7 @@ class CLIUtilityTests(unittest.TestCase): catalog=None, database=None, configuration={}, - definitions=[DefinitionsGlob(include="defs.py")], + libraries=[LibrariesGlob(include="defs.py")], ), ) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org