This is an automated email from the ASF dual-hosted git repository. zhongjiajie pushed a commit to branch 4.0.4-prepare in repository https://gitbox.apache.org/repos/asf/dolphinscheduler-sdk-python.git
commit 741303556516571cbbc75b5fb09e89efa3b160d4 Author: Jay Chung <[email protected]> AuthorDate: Thu Oct 12 19:49:40 2023 +0800 feat: Add task group to task class (#114) ```py extract = Shell( name="extract", command="echo 'Some extract command here'", task_group_id=1, task_group_priority=123 ) ``` fix: #106 --- docs/source/concept.rst | 35 +++++++++++++++++++++++++++++++++++ src/pydolphinscheduler/core/task.py | 9 +++++++++ tests/core/test_engine.py | 2 ++ tests/core/test_task.py | 2 ++ 4 files changed, 48 insertions(+) diff --git a/docs/source/concept.rst b/docs/source/concept.rst index 0d25ea5..ab8df5e 100644 --- a/docs/source/concept.rst +++ b/docs/source/concept.rst @@ -193,6 +193,41 @@ decide workflow of task. You could set `workflow` in both normal assign or in co With both `Workflow`_, `Tasks`_ and `Tasks Dependence`_, we could build a workflow with multiple tasks. +Task Group +~~~~~~~~~~ + +A task group can manage and control the maximum number of concurrently running tasks. This is particularly +useful when you want to limit the simultaneous execution of various task types. For instance, in an ETL +(Extract, Transform, Load) job where data is extracted from a source database, it's crucial to control the +parallelism of extract tasks to prevent an excessive number of connections to the source database. This is +where a task group comes into play. There are two key parameters, ``task_group_id`` and ``task_group_priority`` +that determine the behavior of the task group. + +Task group can control the maximum number of tasks running at the same time. It is useful when you don't want +to run too many type of tasks at the same time. For example when you extract data from source database in ELT +job, you want to control the parallelism of extract task to avoid too many connections to source database. +Then task group can help you. There are two major parameters ``task_group_id`` and ``task_group_priority`` +to control the behavior of task group. + +* ``task_group_id``: is an integer used to identify the task group. You can set a ``task_group_id`` to + restrict the parallelism of tasks. The ``task_group_id`` can be find in the DolphinScheduler web UI. The + default value is ``0``, which means there are no restrictions for this task group. +* ``task_group_priority``: is an integer used to define the priority of the task group. When different tasks + share the same ``task_group_id``, the task group's priority comes into play, controlling the order in which + they run. Higher values indicate higher priority. The default value is ``0``, which means there's no + specific priority for this task group, and tasks will run in the order they were created. + +Here's an example in Python: + +.. code-block:: python + + extract = Shell( + name="extract", + command="echo 'Some extract command here'", + task_group_id=1, + task_group_priority=123 + ) + Resource Files -------------- diff --git a/src/pydolphinscheduler/core/task.py b/src/pydolphinscheduler/core/task.py index c4a0531..b0f27e4 100644 --- a/src/pydolphinscheduler/core/task.py +++ b/src/pydolphinscheduler/core/task.py @@ -91,6 +91,9 @@ class Task(Base): :param task_priority: default TaskPriority.MEDIUM :param worker_group: default configuration.WORKFLOW_WORKER_GROUP :param environment_name: default None + :param task_group_id: Identify of task group to restrict the parallelism of tasks instance run, default 0. + :param task_group_priority: Priority for same task group to, the higher the value, the higher the + priority, default 0. :param delay_time: deault 0 :param fail_retry_times: default 0 :param fail_retry_interval: default 1 @@ -118,6 +121,8 @@ class Task(Base): "delay_time", "fail_retry_times", "fail_retry_interval", + "task_group_id", + "task_group_priority", "timeout_flag", "timeout_notify_strategy", "timeout", @@ -150,6 +155,8 @@ class Task(Base): task_priority: Optional[str] = TaskPriority.MEDIUM, worker_group: Optional[str] = configuration.WORKFLOW_WORKER_GROUP, environment_name: Optional[str] = None, + task_group_id: Optional[int] = 0, + task_group_priority: Optional[int] = 0, delay_time: Optional[int] = 0, fail_retry_times: Optional[int] = 0, fail_retry_interval: Optional[int] = 1, @@ -172,6 +179,8 @@ class Task(Base): self.task_priority = task_priority self.worker_group = worker_group self._environment_name = environment_name + self.task_group_id = task_group_id + self.task_group_priority = task_group_priority self.fail_retry_times = fail_retry_times self.fail_retry_interval = fail_retry_interval self.delay_time = delay_time diff --git a/tests/core/test_engine.py b/tests/core/test_engine.py index ba44fad..f7d97df 100644 --- a/tests/core/test_engine.py +++ b/tests/core/test_engine.py @@ -108,6 +108,8 @@ def test_property_task_params(mock_resource, mock_code_version, attr, expect): "version": 1, "description": None, "delayTime": 0, + "taskGroupId": 0, + "taskGroupPriority": 0, "taskType": "test-engine", "taskParams": { "mainClass": "org.apache.examples.mock.Mock", diff --git a/tests/core/test_task.py b/tests/core/test_task.py index d7aeb81..9781b6e 100644 --- a/tests/core/test_task.py +++ b/tests/core/test_task.py @@ -243,6 +243,8 @@ def test_task_get_define(): "version": version, "description": None, "delayTime": 0, + "taskGroupId": 0, + "taskGroupPriority": 0, "taskType": task_type, "taskParams": { "resourceList": [],
