This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new b1edce2  [python] Add checker pd attr param and fix switch example 
(#7818)
b1edce2 is described below

commit b1edce2b113ae0e79770e7b5a9d1e005e8784f7f
Author: Jiajie Zhong <[email protected]>
AuthorDate: Wed Jan 5 19:59:20 2022 +0800

    [python] Add checker pd attr param and fix switch example (#7818)
    
    * [python] Add checker pd attr param and fix switch example
    
    * Correct submit self.param to java gateway
    * Fix missing parameter for switch example
    * Add mechanism checker before submit to java gateway
    
    close: #7803
    
    * Fix mock get task code and version
    
    * Change judge statement and add comment
---
 .../examples/task_switch_example.py                |  3 +-
 .../pydolphinscheduler/core/process_definition.py  | 38 ++++++++++-
 .../tests/core/test_process_definition.py          | 77 ++++++++++++++++++++++
 3 files changed, 114 insertions(+), 4 deletions(-)

diff --git 
a/dolphinscheduler-python/pydolphinscheduler/examples/task_switch_example.py 
b/dolphinscheduler-python/pydolphinscheduler/examples/task_switch_example.py
index b47b8e3..2b30d0a 100644
--- a/dolphinscheduler-python/pydolphinscheduler/examples/task_switch_example.py
+++ b/dolphinscheduler-python/pydolphinscheduler/examples/task_switch_example.py
@@ -34,8 +34,7 @@ from pydolphinscheduler.tasks.shell import Shell
 from pydolphinscheduler.tasks.switch import Branch, Default, Switch, 
SwitchCondition
 
 with ProcessDefinition(
-    name="task_switch_example",
-    tenant="tenant_exists",
+    name="task_switch_example", tenant="tenant_exists", param={"var": "1"}
 ) as pd:
     parent = Shell(name="parent", command="echo parent")
     switch_child_1 = Shell(name="switch_child_1", command="echo 
switch_child_1")
diff --git 
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
 
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
index 70d4e6b..4941a85 100644
--- 
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
+++ 
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
@@ -24,6 +24,7 @@ from typing import Any, Dict, List, Optional, Set
 from pydolphinscheduler.constants import (
     ProcessDefinitionDefault,
     ProcessDefinitionReleaseState,
+    TaskType,
 )
 from pydolphinscheduler.core.base import Base
 from pydolphinscheduler.exceptions import PyDSParamException, 
PyDSTaskNoFoundException
@@ -97,7 +98,7 @@ class ProcessDefinition(Base):
         worker_group: Optional[str] = ProcessDefinitionDefault.WORKER_GROUP,
         timeout: Optional[int] = 0,
         release_state: Optional[str] = ProcessDefinitionReleaseState.ONLINE,
-        param: Optional[List] = None,
+        param: Optional[Dict] = None,
     ):
         super().__init__(name, description)
         self.schedule = schedule
@@ -190,6 +191,22 @@ class ProcessDefinition(Base):
         self._end_time = val
 
     @property
+    def param_json(self) -> Optional[List[Dict]]:
+        """Return param json base on self.param."""
+        # Handle empty dict and None value
+        if not self.param:
+            return None
+        return [
+            {
+                "prop": k,
+                "direct": "IN",
+                "type": "VARCHAR",
+                "value": v,
+            }
+            for k, v in self.param.items()
+        ]
+
+    @property
     def task_definition_json(self) -> List[Dict]:
         """Return all tasks definition in list of dict."""
         if not self.tasks:
@@ -323,16 +340,33 @@ class ProcessDefinition(Base):
         # Project model need User object exists
         self.project.create_if_not_exists(self._user)
 
+    def _pre_submit_check(self):
+        """Check specific condition satisfy before.
+
+        This method should be called before process definition submit to java 
gateway
+        For now, we have below checker:
+        * `self.param` should be set if task `switch` in this workflow.
+        """
+        if (
+            any([task.task_type == TaskType.SWITCH for task in 
self.tasks.values()])
+            and self.param is None
+        ):
+            raise PyDSParamException(
+                "Parameter param must be provider if task Switch in process 
definition."
+            )
+
     def submit(self) -> int:
         """Submit ProcessDefinition instance to java gateway."""
         self._ensure_side_model_exists()
+        self._pre_submit_check()
+
         gateway = launch_gateway()
         self._process_definition_code = 
gateway.entry_point.createOrUpdateProcessDefinition(
             self._user,
             self._project,
             self.name,
             str(self.description) if self.description else "",
-            str(self.param) if self.param else None,
+            json.dumps(self.param_json),
             json.dumps(self.schedule_json) if self.schedule_json else None,
             json.dumps(self.task_location),
             self.timeout,
diff --git 
a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py
 
b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py
index 8491878..694f9e4 100644
--- 
a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py
+++ 
b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py
@@ -19,6 +19,7 @@
 
 from datetime import datetime
 from typing import Any
+from unittest.mock import patch
 
 import pytest
 from freezegun import freeze_time
@@ -30,10 +31,12 @@ from pydolphinscheduler.constants import (
 from pydolphinscheduler.core.process_definition import ProcessDefinition
 from pydolphinscheduler.exceptions import PyDSParamException
 from pydolphinscheduler.side import Project, Tenant, User
+from pydolphinscheduler.tasks.switch import Branch, Default, Switch, 
SwitchCondition
 from pydolphinscheduler.utils.date import conv_to_schedule
 from tests.testing.task import Task
 
 TEST_PROCESS_DEFINITION_NAME = "simple-test-process-definition"
+TEST_TASK_TYPE = "test-task-type"
 
 
 @pytest.mark.parametrize("func", ["run", "submit", "start"])
@@ -151,6 +154,80 @@ def test__parse_datetime_not_support_type(val: Any):
             pd._parse_datetime(val)
 
 
[email protected](
+    "param, expect",
+    [
+        (
+            None,
+            None,
+        ),
+        (
+            {},
+            None,
+        ),
+        (
+            {"key1": "val1"},
+            [
+                {
+                    "prop": "key1",
+                    "direct": "IN",
+                    "type": "VARCHAR",
+                    "value": "val1",
+                }
+            ],
+        ),
+        (
+            {
+                "key1": "val1",
+                "key2": "val2",
+            },
+            [
+                {
+                    "prop": "key1",
+                    "direct": "IN",
+                    "type": "VARCHAR",
+                    "value": "val1",
+                },
+                {
+                    "prop": "key2",
+                    "direct": "IN",
+                    "type": "VARCHAR",
+                    "value": "val2",
+                },
+            ],
+        ),
+    ],
+)
+def test_property_param_json(param, expect):
+    """Test ProcessDefinition's property param_json."""
+    pd = ProcessDefinition(TEST_PROCESS_DEFINITION_NAME, param=param)
+    assert pd.param_json == expect
+
+
+@patch(
+    "pydolphinscheduler.core.task.Task.gen_code_and_version",
+    return_value=(123, 1),
+)
+def test__pre_submit_check_switch_without_param(mock_code_version):
+    """Test :func:`_pre_submit_check` if process definition with switch but 
without attribute param."""
+    with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
+        parent = Task(name="parent", task_type=TEST_TASK_TYPE)
+        switch_child_1 = Task(name="switch_child_1", task_type=TEST_TASK_TYPE)
+        switch_child_2 = Task(name="switch_child_2", task_type=TEST_TASK_TYPE)
+        switch_condition = SwitchCondition(
+            Branch(condition="${var} > 1", task=switch_child_1),
+            Default(task=switch_child_2),
+        )
+
+        switch = Switch(name="switch", condition=switch_condition)
+        parent >> switch
+        with pytest.raises(
+            PyDSParamException,
+            match="Parameter param must be provider if task Switch in process 
definition.",
+        ):
+            pd._pre_submit_check()
+
+
 def test_process_definition_get_define_without_task():
     """Test process definition function get_define without task."""
     expect = {

Reply via email to