This is an automated email from the ASF dual-hosted git repository.
kerwin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 7c5bebe [Fix-8544][API] The folder's size can't be updated when
creating, updating or deleting a resource file int the folder. (#9107)
7c5bebe is described below
commit 7c5bebea98b64394a74960a5fa0e7a40af26c465
Author: calvin <[email protected]>
AuthorDate: Wed Mar 23 18:58:41 2022 +0800
[Fix-8544][API] The folder's size can't be updated when creating, updating
or deleting a resource file int the folder. (#9107)
---
.../api/service/impl/ResourcesServiceImpl.java | 42 +++++++++-
.../tools/datasource/DolphinSchedulerManager.java | 8 +-
.../tools/datasource/dao/ResourceDao.java | 93 ++++++++++++++++++++++
.../tools/datasource/dao/UpgradeDao.java | 17 +++-
4 files changed, 157 insertions(+), 3 deletions(-)
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java
index 398d182..7d43c58 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java
@@ -33,7 +33,6 @@ import org.apache.dolphinscheduler.api.utils.RegexUtils;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ProgramType;
-import org.apache.dolphinscheduler.spi.enums.ResourceType;
import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.HadoopUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
@@ -50,6 +49,7 @@ import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
import org.apache.dolphinscheduler.dao.mapper.UdfFuncMapper;
import org.apache.dolphinscheduler.dao.mapper.UserMapper;
import org.apache.dolphinscheduler.dao.utils.ResourceProcessDefinitionUtils;
+import org.apache.dolphinscheduler.spi.enums.ResourceType;
import org.apache.commons.beanutils.BeanMap;
import org.apache.commons.collections.CollectionUtils;
@@ -81,6 +81,7 @@ import org.springframework.web.multipart.MultipartFile;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.fasterxml.jackson.databind.SerializationFeature;
+import com.google.common.base.Joiner;
import com.google.common.io.Files;
/**
@@ -221,6 +222,7 @@ public class ResourcesServiceImpl extends BaseServiceImpl
implements ResourcesSe
try {
resourcesMapper.insert(resource);
+ updateParentResourceSize(resource, resource.getSize());
putMsg(result, Status.SUCCESS);
Map<Object, Object> dataMap = new BeanMap(resource);
Map<String, Object> resultMap = new HashMap<>();
@@ -245,6 +247,33 @@ public class ResourcesServiceImpl extends BaseServiceImpl
implements ResourcesSe
}
/**
+ * update the folder's size of the resource
+ *
+ * @param resource the current resource
+ * @param size size
+ */
+ private void updateParentResourceSize(Resource resource, long size) {
+ if (resource.getSize() > 0) {
+ String[] splits = resource.getFullName().split("/");
+ for (int i = 1; i < splits.length; i++) {
+ String parentFullName =
Joiner.on("/").join(Arrays.copyOfRange(splits, 0, i));
+ if (StringUtils.isNotBlank(parentFullName)) {
+ List<Resource> resources =
resourcesMapper.queryResource(parentFullName, resource.getType().ordinal());
+ if (CollectionUtils.isNotEmpty(resources)) {
+ Resource parentResource = resources.get(0);
+ if (parentResource.getSize() + size >= 0) {
+ parentResource.setSize(parentResource.getSize() +
size);
+ } else {
+ parentResource.setSize(0L);
+ }
+ resourcesMapper.updateById(parentResource);
+ }
+ }
+ }
+ }
+ }
+
+ /**
* check resource is exists
*
* @param fullName fullName
@@ -360,6 +389,7 @@ public class ResourcesServiceImpl extends BaseServiceImpl
implements ResourcesSe
// updateResource data
Date now = new Date();
+ long originFileSize = resource.getSize();
resource.setAlias(name);
resource.setFileName(name);
@@ -445,6 +475,8 @@ public class ResourcesServiceImpl extends BaseServiceImpl
implements ResourcesSe
throw new ServiceException(String.format("delete resource:
%s failed.", originFullName));
}
}
+
+ updateParentResourceSize(resource, resource.getSize() -
originFileSize);
return result;
}
@@ -727,11 +759,15 @@ public class ResourcesServiceImpl extends BaseServiceImpl
implements ResourcesSe
String hdfsFilename = HadoopUtils.getHdfsFileName(resource.getType(),
tenantCode, resource.getFullName());
//delete data in database
+
resourcesMapper.selectBatchIds(Arrays.asList(needDeleteResourceIdArray)).forEach(item
-> {
+ updateParentResourceSize(item, item.getSize() * -1);
+ });
resourcesMapper.deleteIds(needDeleteResourceIdArray);
resourceUserMapper.deleteResourceUserArray(0,
needDeleteResourceIdArray);
//delete file on hdfs
HadoopUtils.getInstance().delete(hdfsFilename, true);
+
putMsg(result, Status.SUCCESS);
return result;
@@ -941,6 +977,7 @@ public class ResourcesServiceImpl extends BaseServiceImpl
implements ResourcesSe
Resource resource = new
Resource(pid,name,fullName,false,desc,name,loginUser.getId(),type,content.getBytes().length,now,now);
resourcesMapper.insert(resource);
+ updateParentResourceSize(resource, resource.getSize());
putMsg(result, Status.SUCCESS);
Map<Object, Object> dataMap = new BeanMap(resource);
@@ -1035,10 +1072,13 @@ public class ResourcesServiceImpl extends
BaseServiceImpl implements ResourcesSe
if (StringUtils.isEmpty(tenantCode)) {
return result;
}
+ long originFileSize = resource.getSize();
resource.setSize(content.getBytes().length);
resource.setUpdateTime(new Date());
resourcesMapper.updateById(resource);
+ updateParentResourceSize(resource, resource.getSize() -
originFileSize);
+
result = uploadContentToHdfs(resource.getFullName(), tenantCode,
content);
if (!result.getCode().equals(Status.SUCCESS.getCode())) {
throw new ServiceException(result.getMsg());
diff --git
a/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/DolphinSchedulerManager.java
b/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/DolphinSchedulerManager.java
index 7830220..a588fab 100644
---
a/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/DolphinSchedulerManager.java
+++
b/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/DolphinSchedulerManager.java
@@ -77,6 +77,7 @@ public class DolphinSchedulerManager {
logger.info("Start initializing the DolphinScheduler manager table
structure");
upgradeDao.initSchema();
}
+
public void upgradeDolphinScheduler() throws IOException {
// Gets a list of all upgrades
List<String> schemaList = SchemaUtils.getAllSchemaList();
@@ -99,12 +100,13 @@ public class DolphinSchedulerManager {
}
// The target version of the upgrade
String schemaVersion = "";
+ String currentVersion = version;
for (String schemaDir : schemaList) {
schemaVersion = schemaDir.split("_")[0];
if (SchemaUtils.isAGreatVersion(schemaVersion, version)) {
logger.info("upgrade DolphinScheduler metadata version
from {} to {}", version, schemaVersion);
logger.info("Begin upgrading DolphinScheduler's table
structure");
- upgradeDao.upgradeDolphinScheduler(schemaDir);
+ upgradeDao.upgradeDolphinScheduler(schemaDir);
if ("1.3.0".equals(schemaVersion)) {
upgradeDao.upgradeDolphinSchedulerWorkerGroup();
} else if ("1.3.2".equals(schemaVersion)) {
@@ -115,6 +117,10 @@ public class DolphinSchedulerManager {
version = schemaVersion;
}
}
+
+ if (SchemaUtils.isAGreatVersion("2.0.6", currentVersion) &&
SchemaUtils.isAGreatVersion(SchemaUtils.getSoftVersion(), currentVersion)) {
+ upgradeDao.upgradeDolphinSchedulerResourceFileSize();
+ }
}
// Assign the value of the version field in the version table to the
version of the product
diff --git
a/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/dao/ResourceDao.java
b/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/dao/ResourceDao.java
index 4ddb1a1..e4e8d13 100644
---
a/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/dao/ResourceDao.java
+++
b/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/dao/ResourceDao.java
@@ -17,17 +17,25 @@
package org.apache.dolphinscheduler.tools.datasource.dao;
+import java.sql.SQLException;
+import java.util.Objects;
import org.apache.dolphinscheduler.common.utils.ConnectionUtils;
+import org.apache.dolphinscheduler.spi.utils.StringUtils;
+
+import org.apache.directory.api.util.Strings;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Joiner;
+
/**
* resource dao
*/
@@ -66,4 +74,89 @@ public class ResourceDao {
return resourceMap;
}
+ /**
+ * list all resources by the type
+ *
+ * @param conn connection
+ * @return map that key is full_name and value is the folder's size
+ */
+ private Map<String, Long> listAllResourcesByFileType(Connection conn, int
type) {
+ Map<String, Long> resourceSizeMap = new HashMap<>();
+
+ String sql = String.format("SELECT full_name, type, size, is_directory
FROM t_ds_resources where type = %d", type);
+ ResultSet rs = null;
+ PreparedStatement pstmt = null;
+ try {
+ pstmt = conn.prepareStatement(sql);
+ rs = pstmt.executeQuery();
+
+ while (rs.next()) {
+ String fullName = rs.getString("full_name");
+ Boolean isDirectory = rs.getBoolean("is_directory");
+ long fileSize = rs.getLong("size");
+
+ if (StringUtils.isNotBlank(fullName) && !isDirectory) {
+ String[] splits = fullName.split("/");
+ for (int i = 1; i < splits.length; i++) {
+ String parentFullName =
Joiner.on("/").join(Arrays.copyOfRange(splits,0, splits.length - i));
+ if (Strings.isNotEmpty(parentFullName)) {
+ long size =
resourceSizeMap.getOrDefault(parentFullName, 0L);
+ resourceSizeMap.put(parentFullName, size +
fileSize);
+ }
+ }
+ }
+ }
+ } catch (Exception e) {
+ logger.error(e.getMessage(), e);
+ throw new RuntimeException("sql: " + sql, e);
+ } finally {
+ if (Objects.nonNull(pstmt)) {
+ try {
+ if (!pstmt.isClosed()) {
+ pstmt.close();
+ }
+ } catch (SQLException e) {
+ logger.error(e.getMessage(), e);
+ }
+ }
+ }
+ return resourceSizeMap;
+ }
+
+ /**
+ * update the folder's size
+ *
+ * @param conn connection
+ */
+ public void updateResourceFolderSizeByFileType(Connection conn, int type) {
+ Map<String, Long> resourceSizeMap = listAllResourcesByFileType(conn,
type);
+
+ String sql = "UPDATE t_ds_resources SET size=? where type=? and
full_name=? and is_directory = true";
+ PreparedStatement pstmt = null;
+ try {
+ pstmt = conn.prepareStatement(sql);
+ for (Map.Entry<String, Long> entry : resourceSizeMap.entrySet()) {
+ pstmt.setLong(1, entry.getValue());
+ pstmt.setInt(2, type);
+ pstmt.setString(3, entry.getKey());
+ pstmt.addBatch();
+ }
+ pstmt.executeBatch();
+ } catch (Exception e) {
+ logger.error(e.getMessage(), e);
+ throw new RuntimeException("sql: " + sql, e);
+ } finally {
+ if (Objects.nonNull(pstmt)) {
+ try {
+ if (!pstmt.isClosed()) {
+ pstmt.close();
+ }
+ } catch (SQLException e) {
+ logger.error(e.getMessage(), e);
+ }
+ }
+ ConnectionUtils.releaseResource(conn);
+ }
+ }
+
}
diff --git
a/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/dao/UpgradeDao.java
b/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/dao/UpgradeDao.java
index 06b8aab..8d08dd5 100644
---
a/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/dao/UpgradeDao.java
+++
b/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/dao/UpgradeDao.java
@@ -26,7 +26,6 @@ import org.apache.dolphinscheduler.common.enums.ConditionType;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
-import
org.apache.dolphinscheduler.plugin.task.api.parameters.TaskTimeoutParameter;
import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
import org.apache.dolphinscheduler.common.utils.ConnectionUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
@@ -42,6 +41,7 @@ import org.apache.dolphinscheduler.dao.upgrade.ScheduleDao;
import org.apache.dolphinscheduler.dao.upgrade.SchemaUtils;
import org.apache.dolphinscheduler.dao.upgrade.WorkerGroupDao;
import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
+import
org.apache.dolphinscheduler.plugin.task.api.parameters.TaskTimeoutParameter;
import org.apache.dolphinscheduler.spi.enums.DbType;
import org.apache.commons.collections.CollectionUtils;
@@ -173,6 +173,21 @@ public abstract class UpgradeDao {
}
/**
+ * upgrade DolphinScheduler to 2.0.6
+ */
+ public void upgradeDolphinSchedulerResourceFileSize() {
+ ResourceDao resourceDao = new ResourceDao();
+ try {
+ // update the size of the folder that is the type of file.
+
resourceDao.updateResourceFolderSizeByFileType(dataSource.getConnection(), 0);
+ // update the size of the folder that is the type of udf.
+
resourceDao.updateResourceFolderSizeByFileType(dataSource.getConnection(), 1);
+ } catch (Exception ex) {
+ logger.error("Failed to upgrade because of failing to update the
folder's size of resource files.");
+ }
+ }
+
+ /**
* updateProcessDefinitionJsonWorkerGroup
*/
protected void updateProcessDefinitionJsonWorkerGroup() {