This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new fcbb5f4 [python] Enhance task datax example (#7801)
fcbb5f4 is described below
commit fcbb5f4d8f337236de1aab17a2d1f00b51c2d535
Author: Jiajie Zhong <[email protected]>
AuthorDate: Wed Jan 5 19:57:28 2022 +0800
[python] Enhance task datax example (#7801)
* [python] Enhance task datax example
* Add full example for `CustomDataX.json`
* Add comment about datasource need to exists.
close: #7800
* Add missing parameter setting
---
.../examples/task_datax_example.py | 50 ++++++++++++++++++++--
1 file changed, 47 insertions(+), 3 deletions(-)
diff --git
a/dolphinscheduler-python/pydolphinscheduler/examples/task_datax_example.py
b/dolphinscheduler-python/pydolphinscheduler/examples/task_datax_example.py
index 1832921..cdc7f0f 100644
--- a/dolphinscheduler-python/pydolphinscheduler/examples/task_datax_example.py
+++ b/dolphinscheduler-python/pydolphinscheduler/examples/task_datax_example.py
@@ -29,7 +29,48 @@ from pydolphinscheduler.core.process_definition import
ProcessDefinition
from pydolphinscheduler.tasks.datax import CustomDataX, DataX
# datax json template
-JSON_TEMPLATE = ""
+JSON_TEMPLATE = {
+ "job": {
+ "content": [
+ {
+ "reader": {
+ "name": "mysqlreader",
+ "parameter": {
+ "username": "usr",
+ "password": "pwd",
+ "column": ["id", "name", "code", "description"],
+ "splitPk": "id",
+ "connection": [
+ {
+ "table": ["source_table"],
+ "jdbcUrl":
["jdbc:mysql://127.0.0.1:3306/source_db"],
+ }
+ ],
+ },
+ },
+ "writer": {
+ "name": "mysqlwriter",
+ "parameter": {
+ "writeMode": "insert",
+ "username": "usr",
+ "password": "pwd",
+ "column": ["id", "name"],
+ "connection": [
+ {
+ "jdbcUrl":
"jdbc:mysql://127.0.0.1:3306/target_db",
+ "table": ["target_table"],
+ }
+ ],
+ },
+ },
+ }
+ ],
+ "setting": {
+ "errorLimit": {"percentage": 0, "record": 0},
+ "speed": {"channel": 1, "record": 1000},
+ },
+ }
+}
with ProcessDefinition(
name="task_datax_example",
@@ -37,6 +78,8 @@ with ProcessDefinition(
) as pd:
# This task synchronizes the data in `t_ds_project`
# of `first_mysql` database to `target_project` of `second_mysql` database.
+ # You have to make sure data source named `first_mysql` and `second_mysql`
exists
+ # in your environment.
task1 = DataX(
name="task_datax",
datasource_name="first_mysql",
@@ -45,6 +88,7 @@ with ProcessDefinition(
target_table="target_table",
)
- # you can custom json_template of datax to sync data.
- task2 = CustomDataX(name="task_custom_datax", json=JSON_TEMPLATE)
+ # You can custom json_template of datax to sync data. This task create a
new
+ # datax job same as task1, transfer record from `first_mysql` to
`second_mysql`
+ task2 = CustomDataX(name="task_custom_datax", json=str(JSON_TEMPLATE))
pd.run()