This is an automated email from the ASF dual-hosted git repository. journey pushed a commit to branch 1.3.2-release in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/1.3.2-release by this push: new efef631 [Fix-3423][dao][sql]Fixed that the resource file of the task node can't be found when upgrade from 1.2.0 to 1.3.x (#3454) efef631 is described below commit efef631b68ef5fcac72c6ef2848aa6b84d718da7 Author: lgcareer <18610854...@163.com> AuthorDate: Mon Aug 10 19:05:05 2020 +0800 [Fix-3423][dao][sql]Fixed that the resource file of the task node can't be found when upgrade from 1.2.0 to 1.3.x (#3454) --- .../dao/upgrade/DolphinSchedulerManager.java | 2 + .../dolphinscheduler/dao/upgrade/ResourceDao.java | 67 ++++++++++++++++++++ .../dolphinscheduler/dao/upgrade/UpgradeDao.java | 71 ++++++++++++++++++++-- .../1.3.2_schema/mysql/dolphinscheduler_ddl.sql | 16 +++++ .../1.3.2_schema/mysql/dolphinscheduler_dml.sql | 19 ++++++ .../postgresql/dolphinscheduler_ddl.sql | 16 +++++ .../postgresql/dolphinscheduler_dml.sql | 17 ++++++ 7 files changed, 203 insertions(+), 5 deletions(-) diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/DolphinSchedulerManager.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/DolphinSchedulerManager.java index 8d1d862..b2daae2 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/DolphinSchedulerManager.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/DolphinSchedulerManager.java @@ -117,6 +117,8 @@ public class DolphinSchedulerManager { upgradeDao.upgradeDolphinScheduler(schemaDir); if ("1.3.0".equals(schemaVersion)) { upgradeDao.upgradeDolphinSchedulerWorkerGroup(); + } else if ("1.3.2".equals(schemaVersion)) { + upgradeDao.upgradeDolphinSchedulerResourceList(); } version = schemaVersion; } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ResourceDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ResourceDao.java new file mode 100644 index 0000000..b47971e --- /dev/null +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ResourceDao.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.dao.upgrade; + +import org.apache.dolphinscheduler.common.utils.ConnectionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.util.HashMap; +import java.util.Map; + +/** + * resource dao + */ +public class ResourceDao { + public static final Logger logger = LoggerFactory.getLogger(ProcessDefinitionDao.class); + + /** + * list all resources + * @param conn connection + * @return map that key is full_name and value is id + */ + Map<String,Integer> listAllResources(Connection conn){ + Map<String,Integer> resourceMap = new HashMap<>(); + + String sql = String.format("SELECT id,full_name FROM t_ds_resources"); + ResultSet rs = null; + PreparedStatement pstmt = null; + try { + pstmt = conn.prepareStatement(sql); + rs = pstmt.executeQuery(); + + while (rs.next()){ + Integer id = rs.getInt(1); + String fullName = rs.getString(2); + resourceMap.put(fullName,id); + } + + } catch (Exception e) { + logger.error(e.getMessage(),e); + throw new RuntimeException("sql: " + sql, e); + } finally { + ConnectionUtils.releaseResource(rs, pstmt, conn); + } + + return resourceMap; + } + +} diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java index f9458a8..59c2cbc 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java @@ -19,10 +19,8 @@ package org.apache.dolphinscheduler.dao.upgrade; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import org.apache.dolphinscheduler.common.enums.DbType; -import org.apache.dolphinscheduler.common.utils.ConnectionUtils; -import org.apache.dolphinscheduler.common.utils.SchemaUtils; -import org.apache.dolphinscheduler.common.utils.ScriptRunner; -import org.apache.dolphinscheduler.common.utils.StringUtils; +import org.apache.dolphinscheduler.common.process.ResourceInfo; +import org.apache.dolphinscheduler.common.utils.*; import org.apache.dolphinscheduler.dao.AbstractBaseDao; import org.apache.dolphinscheduler.dao.datasource.ConnectionFactory; import org.slf4j.Logger; @@ -36,7 +34,9 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.text.MessageFormat; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.stream.Collectors; public abstract class UpgradeDao extends AbstractBaseDao { @@ -270,6 +270,15 @@ public abstract class UpgradeDao extends AbstractBaseDao { public void upgradeDolphinSchedulerWorkerGroup() { updateProcessDefinitionJsonWorkerGroup(); } + + /** + * upgrade DolphinScheduler resource list + * ds-1.3.2 modify the resource list for process definition json + */ + public void upgradeDolphinSchedulerResourceList() { + updateProcessDefinitionJsonResourceList(); + } + /** * updateProcessDefinitionJsonWorkerGroup */ @@ -288,7 +297,7 @@ public abstract class UpgradeDao extends AbstractBaseDao { for (int i = 0 ;i < tasks.size() ; i++){ JSONObject task = tasks.getJSONObject(i); Integer workerGroupId = task.getInteger("workerGroupId"); - if (workerGroupId == -1) { + if (workerGroupId == null || workerGroupId == -1) { task.put("workerGroup", "default"); }else { task.put("workerGroup", oldWorkerGroupMap.get(workerGroupId)); @@ -311,6 +320,58 @@ public abstract class UpgradeDao extends AbstractBaseDao { } /** + * updateProcessDefinitionJsonResourceList + */ + protected void updateProcessDefinitionJsonResourceList(){ + ResourceDao resourceDao = new ResourceDao(); + ProcessDefinitionDao processDefinitionDao = new ProcessDefinitionDao(); + Map<Integer,String> replaceProcessDefinitionMap = new HashMap<>(); + try { + Map<String,Integer> resourcesMap = resourceDao.listAllResources(dataSource.getConnection()); + Map<Integer,String> processDefinitionJsonMap = processDefinitionDao.queryAllProcessDefinition(dataSource.getConnection()); + + for (Map.Entry<Integer,String> entry : processDefinitionJsonMap.entrySet()){ + JSONObject jsonObject = JSONObject.parseObject(entry.getValue()); + JSONArray tasks = JSONArray.parseArray(jsonObject.getString("tasks")); + + for (int i = 0 ;i < tasks.size() ; i++){ + JSONObject task = tasks.getJSONObject(i); + JSONObject param = (JSONObject) task.get("params"); + if (param != null) { + + List<ResourceInfo> resourceList = JSONUtils.toList(param.getString("resourceList"), ResourceInfo.class); + + if (CollectionUtils.isNotEmpty(resourceList)) { + List<ResourceInfo> newResourceList = resourceList.stream().map(resInfo -> { + String fullName = resInfo.getRes().startsWith("/") ? resInfo.getRes() : String.format("/%s",resInfo.getRes()); + if (resInfo.getId() == 0 && resourcesMap.containsKey(fullName)) { + resInfo.setId(resourcesMap.get(fullName)); + } + return resInfo; + }).collect(Collectors.toList()); + param.put("resourceList",JSONArray.parse(JSONObject.toJSONString(newResourceList))); + } + } + task.put("params",param); + + } + + jsonObject.remove(jsonObject.getString("tasks")); + + jsonObject.put("tasks",tasks); + + replaceProcessDefinitionMap.put(entry.getKey(),jsonObject.toJSONString()); + } + if (replaceProcessDefinitionMap.size() > 0){ + processDefinitionDao.updateProcessDefinitionJson(dataSource.getConnection(),replaceProcessDefinitionMap); + } + }catch (Exception e){ + logger.error("update process definition json resource list error",e); + } + + } + + /** * upgradeDolphinScheduler DML * @param schemaDir schemaDir */ diff --git a/sql/upgrade/1.3.2_schema/mysql/dolphinscheduler_ddl.sql b/sql/upgrade/1.3.2_schema/mysql/dolphinscheduler_ddl.sql new file mode 100644 index 0000000..38964cc --- /dev/null +++ b/sql/upgrade/1.3.2_schema/mysql/dolphinscheduler_ddl.sql @@ -0,0 +1,16 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ \ No newline at end of file diff --git a/sql/upgrade/1.3.2_schema/mysql/dolphinscheduler_dml.sql b/sql/upgrade/1.3.2_schema/mysql/dolphinscheduler_dml.sql new file mode 100644 index 0000000..383d8a4 --- /dev/null +++ b/sql/upgrade/1.3.2_schema/mysql/dolphinscheduler_dml.sql @@ -0,0 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +SET sql_mode=(SELECT REPLACE(@@sql_mode,'ONLY_FULL_GROUP_BY','')); +SET FOREIGN_KEY_CHECKS=0; +UPDATE t_ds_user SET phone = '' WHERE phone = 'xx'; \ No newline at end of file diff --git a/sql/upgrade/1.3.2_schema/postgresql/dolphinscheduler_ddl.sql b/sql/upgrade/1.3.2_schema/postgresql/dolphinscheduler_ddl.sql new file mode 100644 index 0000000..38964cc --- /dev/null +++ b/sql/upgrade/1.3.2_schema/postgresql/dolphinscheduler_ddl.sql @@ -0,0 +1,16 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ \ No newline at end of file diff --git a/sql/upgrade/1.3.2_schema/postgresql/dolphinscheduler_dml.sql b/sql/upgrade/1.3.2_schema/postgresql/dolphinscheduler_dml.sql new file mode 100644 index 0000000..bf043ad --- /dev/null +++ b/sql/upgrade/1.3.2_schema/postgresql/dolphinscheduler_dml.sql @@ -0,0 +1,17 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +UPDATE t_ds_user SET phone = '' WHERE phone = 'xx'; \ No newline at end of file