This is an automated email from the ASF dual-hosted git repository.
jin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hugegraph-toolchain.git
The following commit(s) were added to refs/heads/master by this push:
new c2f588ca fix(hubble): reset upload quota by job and fix >10GB
historical upload blockage (#725)
c2f588ca is described below
commit c2f588ca9cef5dce8707fbaa15c05ca051cad170
Author: Yifeng Xu <[email protected]>
AuthorDate: Wed Apr 15 14:21:11 2026 +0800
fix(hubble): reset upload quota by job and fix >10GB historical upload
blockage (#725)
* fix(hubble): reset upload quota by job and fix >10GB historical upload
blockage
* fix: followed by imbajin suggestion to avoid potential issue
---
.../controller/load/FileUploadController.java | 225 ++++++++++++++++++---
.../controller/load/JobManagerController.java | 6 +-
.../apache/hugegraph/entity/load/FileMapping.java | 5 +
.../hugegraph/service/load/FileMappingService.java | 205 +++++++++++++++----
.../hugegraph/service/load/JobManagerService.java | 52 +++++
.../src/main/resources/i18n/messages.properties | 5 +-
.../main/resources/i18n/messages_zh_CN.properties | 5 +-
.../data-import/import-tasks/UploadEntry.tsx | 13 +-
.../dataImportStore/dataImportRootStore.ts | 54 +++--
.../types/GraphManagementStore/dataImportStore.ts | 1 +
.../hubble-fe/src/stores/utils/index.ts | 40 +++-
.../hubble-fe/src/utils/dataImportUpload.ts | 61 ++++++
12 files changed, 569 insertions(+), 103 deletions(-)
diff --git
a/hugegraph-hubble/hubble-be/src/main/java/org/apache/hugegraph/controller/load/FileUploadController.java
b/hugegraph-hubble/hubble-be/src/main/java/org/apache/hugegraph/controller/load/FileUploadController.java
index ea29b74d..d019549a 100644
---
a/hugegraph-hubble/hubble-be/src/main/java/org/apache/hugegraph/controller/load/FileUploadController.java
+++
b/hugegraph-hubble/hubble-be/src/main/java/org/apache/hugegraph/controller/load/FileUploadController.java
@@ -24,6 +24,7 @@ import static
org.apache.hugegraph.service.load.FileMappingService.JOB_PREIFX;
import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -82,6 +83,7 @@ public class FileUploadController {
"load.upload.file.duplicate-name");
Map<String, String> tokens = new HashMap<>();
for (String fileName : fileNames) {
+ this.checkFileNameValid(fileName);
String token = this.service.generateFileToken(fileName);
Ex.check(!this.uploadingTokenLocks().containsKey(token),
"load.upload.file.token.existed");
@@ -96,13 +98,19 @@ public class FileUploadController {
@PathVariable("jobId") int jobId,
@RequestParam("file") MultipartFile file,
@RequestParam("name") String fileName,
+ @RequestParam(value = "size",
+ required = false)
+ Long fileSize,
@RequestParam("token") String token,
@RequestParam("total") int total,
@RequestParam("index") int index) {
this.checkTotalAndIndexValid(total, index);
+ this.checkFileNameValid(fileName);
this.checkFileNameMatchToken(fileName, token);
JobManager jobEntity = this.jobService.get(jobId);
- this.checkFileValid(connId, jobId, jobEntity, file, fileName);
+ Long sourceFileSize = this.resolveSourceFileSize(file, fileSize,
+ total, index);
+ this.checkFileValid(jobId, jobEntity, file, fileName);
if (jobEntity.getJobStatus() == JobStatus.DEFAULT) {
jobEntity.setJobStatus(JobStatus.UPLOADING);
this.jobService.update(jobEntity);
@@ -123,39 +131,57 @@ public class FileUploadController {
lock.readLock().lock();
try {
+ FileMapping reservedMapping;
+ synchronized (this.service) {
+ reservedMapping = this.reserveUploadQuota(connId, jobId,
+ fileName, filePath,
+ sourceFileSize);
+ }
result = this.service.uploadFile(file, index, filePath);
if (result.getStatus() == FileUploadResult.Status.FAILURE) {
return result;
}
synchronized (this.service) {
- // Verify the existence of fragmented files
FileMapping mapping = this.service.get(connId, jobId,
fileName);
if (mapping == null) {
- mapping = new FileMapping(connId, fileName, filePath);
- mapping.setJobId(jobId);
- mapping.setFileStatus(FileMappingStatus.UPLOADING);
- this.service.save(mapping);
- } else {
- if (mapping.getFileStatus() ==
FileMappingStatus.COMPLETED) {
- result.setId(mapping.getId());
- // Remove uploading file token
- this.uploadingTokenLocks().remove(token);
- return result;
- } else {
- mapping.setUpdateTime(HubbleUtil.nowDate());
- }
+ mapping = reservedMapping;
+ }
+ Ex.check(mapping != null, "load.file-mapping.not-exist.name",
+ fileName);
+ if (mapping.getFileStatus() == FileMappingStatus.COMPLETED) {
+ result.setId(mapping.getId());
+ this.uploadingTokenLocks().remove(token);
+ return result;
}
+ mapping.setUpdateTime(HubbleUtil.nowDate());
// Determine whether all the parts have been uploaded, then
merge them
boolean merged = this.service.tryMergePartFiles(filePath,
total);
if (!merged) {
this.service.update(mapping);
return result;
}
+ JobManager currentJob = this.jobService.get(jobId);
+ long actualFileSize;
+ try {
+ actualFileSize = this.resolveUploadedFileSize(
+ mapping.getPath());
+ Ex.check(currentJob != null, "job-manager.not-exist.id",
+ jobId);
+ long reservedUploadingSize =
+ this.sumReservedUploadingSize(jobId,
+ mapping.getId());
+ this.checkFileSizeLimit(actualFileSize,
+ currentJob.getJobSize(),
+ reservedUploadingSize);
+ } catch (RuntimeException e) {
+ this.cleanupFailedUpload(mapping, token);
+ throw e;
+ }
// Read column names and values then fill it
this.service.extractColumns(mapping);
mapping.setFileStatus(FileMappingStatus.COMPLETED);
mapping.setTotalLines(FileUtil.countLines(mapping.getPath()));
- mapping.setTotalSize(FileUtils.sizeOf(new
File(mapping.getPath())));
+ mapping.setTotalSize(actualFileSize);
// Move to the directory corresponding to the file mapping Id
String newPath = this.service.moveToNextLevelDir(mapping);
@@ -163,9 +189,9 @@ public class FileUploadController {
mapping.setPath(newPath);
this.service.update(mapping);
// Update Job Manager size
- long jobSize = jobEntity.getJobSize() + mapping.getTotalSize();
- jobEntity.setJobSize(jobSize);
- this.jobService.update(jobEntity);
+ long jobSize = currentJob.getJobSize() +
mapping.getTotalSize();
+ currentJob.setJobSize(jobSize);
+ this.jobService.update(currentJob);
result.setId(mapping.getId());
// Remove uploading file token
this.uploadingTokenLocks().remove(token);
@@ -181,6 +207,7 @@ public class FileUploadController {
@PathVariable("jobId") int jobId,
@RequestParam("name") String fileName,
@RequestParam("token") String token) {
+ this.checkFileNameValid(fileName);
JobManager jobEntity = this.jobService.get(jobId);
Ex.check(jobEntity != null, "job-manager.not-exist.id", jobId);
Ex.check(jobEntity.getJobStatus() == JobStatus.UPLOADING ||
@@ -243,7 +270,16 @@ public class FileUploadController {
"load.upload.file.name-token.unmatch");
}
- private void checkFileValid(int connId, int jobId, JobManager jobEntity,
+ private void checkFileNameValid(String fileName) {
+ Ex.check(StringUtils.isNotBlank(fileName) &&
+ fileName.equals(FilenameUtils.getName(fileName)) &&
+ !fileName.contains("/") &&
+ !fileName.contains("\\") &&
+ !fileName.contains(".."),
+ "load.upload.file.name.invalid");
+ }
+
+ private void checkFileValid(int jobId, JobManager jobEntity,
MultipartFile file, String fileName) {
Ex.check(jobEntity != null, "job-manager.not-exist.id", jobId);
Ex.check(jobEntity.getJobStatus() == JobStatus.DEFAULT ||
@@ -261,31 +297,154 @@ public class FileUploadController {
HubbleOptions.UPLOAD_FILE_FORMAT_LIST);
Ex.check(formatWhiteList.contains(format),
"load.upload.file.format.unsupported");
+ }
+
+ private FileMapping reserveUploadQuota(int connId, int jobId,
+ String fileName, String filePath,
+ Long sourceFileSize) {
+ JobManager currentJob = this.jobService.get(jobId);
+ Ex.check(currentJob != null, "job-manager.not-exist.id", jobId);
+
+ FileMapping mapping = this.service.get(connId, jobId, fileName);
+ Ex.check(mapping == null ||
+ mapping.getFileStatus() == FileMappingStatus.UPLOADING,
+ "load.upload.file.existed", fileName);
+
+ long reservedFileSize = this.resolveReservedFileSize(mapping,
+ sourceFileSize);
+ Integer mappingId = mapping == null ? null : mapping.getId();
+ long reservedUploadingSize = this.sumReservedUploadingSize(jobId,
+ mappingId);
+ this.checkFileSizeLimit(reservedFileSize, currentJob.getJobSize(),
+ reservedUploadingSize);
+
+ if (mapping == null) {
+ mapping = new FileMapping(connId, fileName, filePath);
+ mapping.setJobId(jobId);
+ this.fillUploadingReservation(mapping, reservedFileSize);
+ this.service.save(mapping);
+ return mapping;
+ }
+
+ mapping.setPath(filePath);
+ this.fillUploadingReservation(mapping, reservedFileSize);
+ this.service.update(mapping);
+ return mapping;
+ }
+
+ private Long resolveSourceFileSize(MultipartFile file, Long fileSize,
+ int total, int index) {
+ if (total == 1) {
+ return file.getSize();
+ }
+ if (fileSize != null) {
+ return fileSize;
+ }
+ if (index == 0) {
+ return this.estimateChunkedFileSizeUpperBound(file.getSize(),
+ total);
+ }
+ return null;
+ }
+
+ private void checkFileSizeLimit(long fileSize, long currentJobSize) {
+ this.checkFileSizeLimit(fileSize, currentJobSize, 0L);
+ }
+
+ private void checkFileSizeLimit(long fileSize, long currentJobSize,
+ long reservedUploadingSize) {
+ Ex.check(fileSize > 0L, "load.upload.file.cannot-be-empty");
- long fileSize = file.getSize();
long singleFileSizeLimit = this.config.get(
HubbleOptions.UPLOAD_SINGLE_FILE_SIZE_LIMIT);
Ex.check(fileSize <= singleFileSizeLimit,
"load.upload.file.exceed-single-size",
FileUtils.byteCountToDisplaySize(singleFileSizeLimit));
- // Check is there a file with the same name
- FileMapping oldMapping = this.service.get(connId, jobId, fileName);
- Ex.check(oldMapping == null ||
- oldMapping.getFileStatus() == FileMappingStatus.UPLOADING,
- "load.upload.file.existed", fileName);
-
long totalFileSizeLimit = this.config.get(
HubbleOptions.UPLOAD_TOTAL_FILE_SIZE_LIMIT);
- List<FileMapping> fileMappings = this.service.listAll();
- long currentTotalSize = fileMappings.stream()
- .map(FileMapping::getTotalSize)
- .reduce(0L, (Long::sum));
- Ex.check(fileSize + currentTotalSize <= totalFileSizeLimit,
- "load.upload.file.exceed-single-size",
+ long totalReservedSize = this.safeAdd(this.safeAdd(fileSize,
+ currentJobSize),
+ reservedUploadingSize);
+ Ex.check(totalReservedSize <= totalFileSizeLimit,
+ "load.upload.file.exceed-total-size",
FileUtils.byteCountToDisplaySize(totalFileSizeLimit));
}
+ private long resolveUploadedFileSize(String filePath) {
+ File uploadedFile = this.service.requirePathUnderUploadRoot(filePath);
+ if (!uploadedFile.exists() || !uploadedFile.isFile()) {
+ throw new InternalException("The uploaded file '%s' is not ready "
+
+ "for quota validation",
+ filePath);
+ }
+ return FileUtils.sizeOf(uploadedFile);
+ }
+
+ private void cleanupFailedUpload(FileMapping mapping, String token) {
+ this.uploadingTokenLocks().remove(token);
+ this.service.cleanupMappings(Collections.singletonList(mapping));
+ }
+
+ private long resolveReservedFileSize(FileMapping mapping,
+ Long sourceFileSize) {
+ long reservedFileSize = mapping == null ? 0L : mapping.getTotalSize();
+ if (sourceFileSize != null) {
+ return Math.max(reservedFileSize, sourceFileSize);
+ }
+ Ex.check(reservedFileSize > 0L,
+ "load.upload.file.size.missing-before-reserve");
+ return reservedFileSize;
+ }
+
+ private void fillUploadingReservation(FileMapping mapping,
+ long reservedFileSize) {
+ mapping.setFileStatus(FileMappingStatus.UPLOADING);
+ mapping.setTotalSize(reservedFileSize);
+ mapping.setUpdateTime(HubbleUtil.nowDate());
+ }
+
+ private long sumReservedUploadingSize(int jobId, Integer
excludedMappingId) {
+ List<FileMapping> mappings = this.service.listByJob(jobId);
+ if (mappings == null || mappings.isEmpty()) {
+ return 0L;
+ }
+
+ long reservedUploadingSize = 0L;
+ for (FileMapping mapping : mappings) {
+ if (mapping == null ||
+ mapping.getFileStatus() != FileMappingStatus.UPLOADING) {
+ continue;
+ }
+ if (excludedMappingId != null &&
+ excludedMappingId.equals(mapping.getId())) {
+ continue;
+ }
+ if (mapping.getTotalSize() <= 0L) {
+ continue;
+ }
+ reservedUploadingSize = this.safeAdd(reservedUploadingSize,
+ mapping.getTotalSize());
+ }
+ return reservedUploadingSize;
+ }
+
+ private long estimateChunkedFileSizeUpperBound(long chunkSize, int total) {
+ try {
+ return Math.multiplyExact(chunkSize, (long) total);
+ } catch (ArithmeticException ignored) {
+ return Long.MAX_VALUE;
+ }
+ }
+
+ private long safeAdd(long left, long right) {
+ try {
+ return Math.addExact(left, right);
+ } catch (ArithmeticException ignored) {
+ return Long.MAX_VALUE;
+ }
+ }
+
private String generateFilePath(int connId, int jobId, String fileName) {
String location = this.config.get(HubbleOptions.UPLOAD_FILE_LOCATION);
String path = Paths.get(CONN_PREIFX + connId, JOB_PREIFX + jobId)
diff --git
a/hugegraph-hubble/hubble-be/src/main/java/org/apache/hugegraph/controller/load/JobManagerController.java
b/hugegraph-hubble/hubble-be/src/main/java/org/apache/hugegraph/controller/load/JobManagerController.java
index 5bb90526..12ccaa41 100644
---
a/hugegraph-hubble/hubble-be/src/main/java/org/apache/hugegraph/controller/load/JobManagerController.java
+++
b/hugegraph-hubble/hubble-be/src/main/java/org/apache/hugegraph/controller/load/JobManagerController.java
@@ -104,11 +104,7 @@ public class JobManagerController {
@DeleteMapping("{id}")
public void delete(@PathVariable("id") int id) {
- JobManager task = this.service.get(id);
- if (task == null) {
- throw new ExternalException("job.manager.not-exist.id", id);
- }
- this.service.remove(id);
+ this.service.deleteJob(id);
}
@GetMapping("{id}")
diff --git
a/hugegraph-hubble/hubble-be/src/main/java/org/apache/hugegraph/entity/load/FileMapping.java
b/hugegraph-hubble/hubble-be/src/main/java/org/apache/hugegraph/entity/load/FileMapping.java
index 53f8a525..5f23c213 100644
---
a/hugegraph-hubble/hubble-be/src/main/java/org/apache/hugegraph/entity/load/FileMapping.java
+++
b/hugegraph-hubble/hubble-be/src/main/java/org/apache/hugegraph/entity/load/FileMapping.java
@@ -116,6 +116,11 @@ public class FileMapping {
@JsonProperty("update_time")
private Date updateTime;
+ @JsonProperty("total_size_bytes")
+ public long getTotalSizeBytes() {
+ return this.totalSize;
+ }
+
public FileMapping(int connId, String name, String path) {
this(connId, name, path, HubbleUtil.nowDate());
}
diff --git
a/hugegraph-hubble/hubble-be/src/main/java/org/apache/hugegraph/service/load/FileMappingService.java
b/hugegraph-hubble/hubble-be/src/main/java/org/apache/hugegraph/service/load/FileMappingService.java
index e574ad79..f174c890 100644
---
a/hugegraph-hubble/hubble-be/src/main/java/org/apache/hugegraph/service/load/FileMappingService.java
+++
b/hugegraph-hubble/hubble-be/src/main/java/org/apache/hugegraph/service/load/FileMappingService.java
@@ -27,15 +27,21 @@ import java.io.FileReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.nio.file.Path;
import java.nio.file.Paths;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.Date;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.regex.Pattern;
+import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
@@ -45,8 +51,10 @@ import org.apache.hugegraph.entity.enums.FileMappingStatus;
import org.apache.hugegraph.entity.load.FileMapping;
import org.apache.hugegraph.entity.load.FileSetting;
import org.apache.hugegraph.entity.load.FileUploadResult;
+import org.apache.hugegraph.entity.load.JobManager;
import org.apache.hugegraph.exception.InternalException;
import org.apache.hugegraph.mapper.load.FileMappingMapper;
+import org.apache.hugegraph.mapper.load.JobManagerMapper;
import org.apache.hugegraph.options.HubbleOptions;
import org.apache.hugegraph.util.Ex;
import org.apache.hugegraph.util.HubbleUtil;
@@ -78,6 +86,8 @@ public class FileMappingService {
private HugeConfig config;
@Autowired
private FileMappingMapper mapper;
+ @Autowired
+ private JobManagerMapper jobManagerMapper;
private final Map<String, ReadWriteLock> uploadingTokenLocks;
@@ -105,6 +115,12 @@ public class FileMappingService {
return this.mapper.selectList(null);
}
+ public List<FileMapping> listByJob(int jobId) {
+ QueryWrapper<FileMapping> query = Wrappers.query();
+ query.eq("job_id", jobId);
+ return this.mapper.selectList(query);
+ }
+
public IPage<FileMapping> list(int connId, int jobId, int pageNo, int
pageSize) {
QueryWrapper<FileMapping> query = Wrappers.query();
query.eq("conn_id", connId);
@@ -137,7 +153,7 @@ public class FileMappingService {
}
public String generateFileToken(String fileName) {
- return HubbleUtil.md5(fileName) + "-" +
+ return this.fileTokenPrefix(fileName) +
HubbleUtil.nowTime().getEpochSecond();
}
@@ -238,7 +254,7 @@ public class FileMappingService {
}
public void extractColumns(FileMapping mapping) {
- File file = FileUtils.getFile(mapping.getPath());
+ File file = this.requirePathUnderUploadRoot(mapping.getPath());
BufferedReader reader;
try {
reader = new BufferedReader(new FileReader(file));
@@ -289,7 +305,7 @@ public class FileMappingService {
}
public String moveToNextLevelDir(FileMapping mapping) {
- File currFile = new File(mapping.getPath());
+ File currFile = this.requirePathUnderUploadRoot(mapping.getPath());
String destPath = Paths.get(currFile.getParentFile().getPath(),
FILE_PREIFX + mapping.getId())
.toString();
@@ -305,31 +321,32 @@ public class FileMappingService {
}
public void deleteDiskFile(FileMapping mapping) {
- File file = new File(mapping.getPath());
+ File file = this.requirePathUnderUploadRoot(mapping.getPath());
if (file.isDirectory()) {
- log.info("Prepare to delete directory {}", file);
- try {
- FileUtils.forceDelete(file);
- } catch (IOException e) {
- throw new InternalException("Failed to delete directory " +
- "corresponded to the file id %s, "
+
- "please delete it manually",
- e, mapping.getId());
- }
+ this.deletePathIfExists(file, mapping.getId());
} else {
File parentDir = file.getParentFile();
- log.info("Prepare to delete directory {}", parentDir);
- try {
- FileUtils.forceDelete(parentDir);
- } catch (IOException e) {
- throw new InternalException("Failed to delete parent directory
" +
- "corresponded to the file id %s, "
+
- "please delete it manually",
- e, mapping.getId());
+ if (parentDir == null) {
+ log.info("Skip deleting file mapping {} because {} has no " +
+ "parent directory", mapping.getId(),
mapping.getPath());
+ return;
}
+ this.deletePathIfExists(parentDir, mapping.getId());
+ }
+ }
+
+ public void cleanupMappings(List<FileMapping> mappings) {
+ for (FileMapping mapping : mappings) {
+ this.tryCleanupMapping(mapping);
}
}
+ @Async
+ @Scheduled(fixedRate = 10 * 60 * 1000)
+ public void deleteOrphanedJobFiles() {
+ this.cleanupMappings(this.listOrphanedJobFiles());
+ }
+
@Async
@Scheduled(fixedRate = 10 * 60 * 1000)
public void deleteUnfinishedFile() {
@@ -343,24 +360,136 @@ public class FileMappingService {
Date updateTime = mapping.getUpdateTime();
long duration = now.getTime() - updateTime.getTime();
if (duration > threshold) {
- String filePath = mapping.getPath();
- try {
- FileUtils.forceDelete(new File(filePath));
- } catch (IOException e) {
- log.warn("Failed to delete expired uploading file {}",
- filePath, e);
- }
- this.remove(mapping.getId());
- // Delete corresponding uploading tokens
- Iterator<Map.Entry<String, ReadWriteLock>> iter;
- iter = this.uploadingTokenLocks.entrySet().iterator();
- iter.forEachRemaining(entry -> {
- String token = entry.getKey();
- if (token.startsWith(mapping.getName())) {
- iter.remove();
- }
- });
+ this.tryDeleteUnfinishedMapping(mapping);
}
}
}
+
+ public File requirePathUnderUploadRoot(String filePath) {
+ return this.requirePathUnderUploadRoot(new File(filePath));
+ }
+
+ private void deletePathIfExists(File path, int mappingId) {
+ File safePath = this.requirePathUnderUploadRoot(path);
+ if (!safePath.exists()) {
+ log.info("Skip deleting path {} for mapping {} because it no " +
+ "longer exists", safePath, mappingId);
+ return;
+ }
+
+ log.info("Prepare to delete directory {}", safePath);
+ try {
+ FileUtils.forceDelete(safePath);
+ } catch (IOException e) {
+ throw new InternalException("Failed to delete directory " +
+ "corresponded to the file id %s, " +
+ "please delete it manually",
+ e, mappingId);
+ }
+ }
+
+ private List<FileMapping> listOrphanedJobFiles() {
+ List<FileMapping> mappings = this.mapper.selectList(null);
+ if (mappings.isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ Set<Integer> jobIds = mappings.stream()
+ .map(FileMapping::getJobId)
+ .filter(jobId -> jobId != null)
+ .collect(Collectors.toSet());
+ if (jobIds.isEmpty()) {
+ return new ArrayList<>(mappings);
+ }
+
+ List<JobManager> jobs = this.jobManagerMapper.selectBatchIds(jobIds);
+ Set<Integer> existingJobIds = jobs.stream()
+ .map(JobManager::getId)
+ .collect(Collectors.toCollection(
+ HashSet::new));
+ return mappings.stream()
+ .filter(mapping -> mapping.getJobId() == null ||
+ !existingJobIds.contains(
+ mapping.getJobId()))
+ .collect(Collectors.toList());
+ }
+
+ private void tryCleanupMapping(FileMapping mapping) {
+ try {
+ this.deleteDiskFile(mapping);
+ this.removeCleanupRecord(mapping.getId());
+ } catch (RuntimeException e) {
+ log.warn("Failed to cleanup disk file for mapping {} at {}",
+ mapping.getId(), mapping.getPath(), e);
+ }
+ }
+
+ private void removeCleanupRecord(int mappingId) {
+ int deleted = this.mapper.deleteById(mappingId);
+ if (deleted == 1) {
+ return;
+ }
+ if (deleted == 0) {
+ log.info("Skip removing file mapping {} because it no longer " +
+ "exists", mappingId);
+ return;
+ }
+ throw new InternalException("entity.delete.failed", mappingId);
+ }
+
+ private File requirePathUnderUploadRoot(File file) {
+ Path uploadRootPath = this.normalizePath(new File(
+ this.config.get(HubbleOptions.UPLOAD_FILE_LOCATION)));
+ Path targetPath = this.normalizePath(file);
+ if (!targetPath.startsWith(uploadRootPath)) {
+ throw new InternalException("load.upload.file.path.outside-root",
+ targetPath, uploadRootPath);
+ }
+ return targetPath.toFile();
+ }
+
+ private Path normalizePath(File file) {
+ Path path = file.toPath();
+ try {
+ if (file.exists()) {
+ return path.toRealPath();
+ }
+ } catch (IOException e) {
+ throw new InternalException("Failed to resolve upload path '%s'",
+ e, file);
+ }
+ return path.toAbsolutePath().normalize();
+ }
+
+ private void tryDeleteUnfinishedMapping(FileMapping mapping) {
+ String filePath = mapping.getPath();
+ try {
+ FileUtils.forceDelete(this.requirePathUnderUploadRoot(filePath));
+ } catch (IOException e) {
+ log.warn("Failed to delete expired uploading file {}",
+ filePath, e);
+ } catch (RuntimeException e) {
+ log.warn("Skip deleting expired uploading file {} because the " +
+ "path is invalid", filePath, e);
+ return;
+ }
+ this.remove(mapping.getId());
+ this.removeUploadingTokens(mapping.getName());
+ }
+
+ private void removeUploadingTokens(String fileName) {
+ String tokenPrefix = this.fileTokenPrefix(fileName);
+ Iterator<Map.Entry<String, ReadWriteLock>> iter;
+ iter = this.uploadingTokenLocks.entrySet().iterator();
+ while (iter.hasNext()) {
+ Map.Entry<String, ReadWriteLock> entry = iter.next();
+ if (entry.getKey().startsWith(tokenPrefix)) {
+ iter.remove();
+ }
+ }
+ }
+
+ private String fileTokenPrefix(String fileName) {
+ return HubbleUtil.md5(fileName) + "-";
+ }
}
diff --git
a/hugegraph-hubble/hubble-be/src/main/java/org/apache/hugegraph/service/load/JobManagerService.java
b/hugegraph-hubble/hubble-be/src/main/java/org/apache/hugegraph/service/load/JobManagerService.java
index 6e337421..eb085c39 100644
---
a/hugegraph-hubble/hubble-be/src/main/java/org/apache/hugegraph/service/load/JobManagerService.java
+++
b/hugegraph-hubble/hubble-be/src/main/java/org/apache/hugegraph/service/load/JobManagerService.java
@@ -18,13 +18,16 @@
package org.apache.hugegraph.service.load;
+import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import org.apache.hugegraph.entity.enums.JobStatus;
import org.apache.hugegraph.entity.enums.LoadStatus;
+import org.apache.hugegraph.entity.load.FileMapping;
import org.apache.hugegraph.entity.load.JobManager;
import org.apache.hugegraph.entity.load.LoadTask;
+import org.apache.hugegraph.exception.ExternalException;
import org.apache.hugegraph.exception.InternalException;
import org.apache.hugegraph.mapper.load.JobManagerMapper;
import org.apache.hugegraph.util.HubbleUtil;
@@ -32,6 +35,8 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Isolation;
import org.springframework.transaction.annotation.Transactional;
+import org.springframework.transaction.support.TransactionSynchronization;
+import
org.springframework.transaction.support.TransactionSynchronizationManager;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
@@ -48,6 +53,8 @@ public class JobManagerService {
private JobManagerMapper mapper;
@Autowired
private LoadTaskService taskService;
+ @Autowired
+ private FileMappingService fileMappingService;
public int count() {
return this.mapper.selectCount(null);
@@ -132,4 +139,49 @@ public class JobManagerService {
throw new InternalException("entity.delete.failed", id);
}
}
+
+ @Transactional(isolation = Isolation.READ_COMMITTED)
+ public void deleteJob(int id) {
+ JobManager job = this.get(id);
+ if (job == null) {
+ throw new ExternalException("job.manager.not-exist.id", id);
+ }
+
+ List<LoadTask> loadTasks = this.taskService.taskListByJob(id);
+ for (LoadTask loadTask : loadTasks) {
+ if (loadTask.getStatus().inRunning() ||
+ loadTask.getStatus() == LoadStatus.PAUSED) {
+ this.taskService.stop(loadTask.getId());
+ }
+ this.taskService.remove(loadTask.getId());
+ }
+
+ List<FileMapping> mappings = this.fileMappingService.listByJob(id);
+ this.remove(id);
+ this.deleteDiskFilesAfterCommit(mappings);
+ }
+
+ private void deleteDiskFilesAfterCommit(List<FileMapping> mappings) {
+ if (mappings.isEmpty()) {
+ return;
+ }
+
+ List<FileMapping> copiedMappings = new ArrayList<>(mappings);
+ if (!TransactionSynchronizationManager.isSynchronizationActive()) {
+ this.deleteDiskFiles(copiedMappings);
+ return;
+ }
+
+ TransactionSynchronizationManager.registerSynchronization(
+ new TransactionSynchronization() {
+ @Override
+ public void afterCommit() {
+ deleteDiskFiles(copiedMappings);
+ }
+ });
+ }
+
+ private void deleteDiskFiles(List<FileMapping> mappings) {
+ this.fileMappingService.cleanupMappings(mappings);
+ }
}
diff --git
a/hugegraph-hubble/hubble-be/src/main/resources/i18n/messages.properties
b/hugegraph-hubble/hubble-be/src/main/resources/i18n/messages.properties
index 117b297f..ddbb082d 100644
--- a/hugegraph-hubble/hubble-be/src/main/resources/i18n/messages.properties
+++ b/hugegraph-hubble/hubble-be/src/main/resources/i18n/messages.properties
@@ -135,8 +135,11 @@ load.upload.file.duplicate-name=Don't allow duplicate file
names to obtain token
load.upload.file.name-token.unmatch=The upload file name doesn't match with
token
load.upload.files.cannot-dup=The upload files can't contain duplicates
load.upload.file.exceed-single-size=The upload file has exceeded single limit
size {0}
-load.upload.file.exceed-total-size=The upload file has exceeded total limit
size {0}
+load.upload.file.exceed-total-size=The upload files in current job exceed
total size limit {0}
load.upload.file.existed=The upload file {0} has existed
+load.upload.file.name.invalid=The upload file name must be a plain file name
+load.upload.file.path.outside-root=The upload path {0} is outside the
configured upload directory {1}
+load.upload.file.size.missing-before-reserve=The upload size must be reserved
before accepting more chunks
load.file-mapping.not-exist.id=The file doesn't exist with id {0}
load.file-mapping.not-exist.name=The file doesn't exist with name {0}
load.file-mapping.file-setting.delimiter-cannot-be-empty=The delimiter can't
be null or empty
diff --git
a/hugegraph-hubble/hubble-be/src/main/resources/i18n/messages_zh_CN.properties
b/hugegraph-hubble/hubble-be/src/main/resources/i18n/messages_zh_CN.properties
index 4ef261b5..7c333d38 100644
---
a/hugegraph-hubble/hubble-be/src/main/resources/i18n/messages_zh_CN.properties
+++
b/hugegraph-hubble/hubble-be/src/main/resources/i18n/messages_zh_CN.properties
@@ -135,8 +135,11 @@ load.upload.file.duplicate-name=不允许重复的文件名同时获取 token
load.upload.file.name-token.unmatch=上传文件的名称与 token 不匹配
load.upload.files.cannot-dup=上传的文件不能包含重复的
load.upload.file.exceed-single-size=上传文件大小超过了限制 {0}
-load.upload.file.exceed-total-size=上传文件总大小超过了限制 {0}
+load.upload.file.exceed-total-size=当前导入任务中的上传文件总大小超过了限制 {0}
load.upload.file.existed=上传的文件 {0} 已存在
+load.upload.file.name.invalid=上传文件名必须是纯文件名,不能包含路径信息
+load.upload.file.path.outside-root=上传路径 {0} 不在配置的上传目录 {1} 下
+load.upload.file.size.missing-before-reserve=服务端尚未为该文件预留大小,不能继续接收后续分片
load.file-mapping.not-exist.id=不存在 id 为 {0} 的文件
load.file-mapping.not-exist.name=不存在名为 {0} 的文件
load.file-mapping.file-setting.delimiter-cannot-be-empty=分隔符不能为空
diff --git
a/hugegraph-hubble/hubble-fe/src/components/graph-management/data-import/import-tasks/UploadEntry.tsx
b/hugegraph-hubble/hubble-fe/src/components/graph-management/data-import/import-tasks/UploadEntry.tsx
index 84f1132d..0449f24c 100644
---
a/hugegraph-hubble/hubble-fe/src/components/graph-management/data-import/import-tasks/UploadEntry.tsx
+++
b/hugegraph-hubble/hubble-fe/src/components/graph-management/data-import/import-tasks/UploadEntry.tsx
@@ -35,6 +35,7 @@ import { CancellablePromise } from 'mobx/lib/api/flow';
import { DataImportRootStoreContext } from '../../../../stores';
import { useInitDataImport } from '../../../../hooks';
+import { isCurrentJobUploadSizeExceeded } from
'../../../../utils/dataImportUpload';
import type { FileUploadResult } from
'../../../../stores/types/GraphManagementStore/dataImportStore';
@@ -207,11 +208,13 @@ export const FileDropZone: React.FC = observer(() => {
});
}
- const totalSize = filteredFiles
- .map(({ size }) => size)
- .reduce((prev, curr) => prev + curr, 0);
-
- if (totalSize / GB > 10) {
+ if (
+ isCurrentJobUploadSizeExceeded(
+ dataMapStore.fileMapInfos,
+ dataImportRootStore.fileUploadTasks,
+ filteredFiles
+ )
+ ) {
Message.error({
content: `${t('upload-files.over-all-size-limit')}`,
size: 'medium',
diff --git
a/hugegraph-hubble/hubble-fe/src/stores/GraphManagementStore/dataImportStore/dataImportRootStore.ts
b/hugegraph-hubble/hubble-fe/src/stores/GraphManagementStore/dataImportStore/dataImportRootStore.ts
index 451613e0..fe25499c 100644
---
a/hugegraph-hubble/hubble-fe/src/stores/GraphManagementStore/dataImportStore/dataImportRootStore.ts
+++
b/hugegraph-hubble/hubble-fe/src/stores/GraphManagementStore/dataImportStore/dataImportRootStore.ts
@@ -38,7 +38,7 @@ import {
EdgeType,
EdgeTypeListResponse
} from '../../types/GraphManagementStore/metadataConfigsStore';
-import { checkIfLocalNetworkOffline } from '../../utils';
+import { checkIfLocalNetworkOffline, getErrorMessage } from '../../utils';
const MAX_CONCURRENT_UPLOAD = 5;
@@ -79,6 +79,20 @@ export class DataImportRootStore {
return this.fileUploadTasks.filter(({ status }) => status !== 'success');
}
+ findFileUploadTask(fileName: string) {
+ return this.fileUploadTasks.find(({ name }) => name === fileName);
+ }
+
+ getRequiredFileUploadTask(fileName: string) {
+ const fileUploadTask = this.findFileUploadTask(fileName);
+
+ if (isUndefined(fileUploadTask)) {
+ throw new Error(`Upload task '${fileName}' not found`);
+ }
+
+ return fileUploadTask;
+ }
+
@action
setCurrentId(id: number) {
this.currentId = id;
@@ -134,9 +148,7 @@ export class DataImportRootStore {
value: FileUploadTask[T],
fileName: string
) {
- const fileUploadTask = this.fileUploadTasks.find(
- ({ name }) => name === fileName
- )!;
+ const fileUploadTask = this.findFileUploadTask(fileName);
// users may click back button in browser
if (!isUndefined(fileUploadTask)) {
@@ -206,9 +218,10 @@ export class DataImportRootStore {
this.fileHashes = { ...this.fileHashes, ...result.data.data };
this.requestStatus.fetchFilehashes = 'success';
} catch (error) {
+ const errorMessage = getErrorMessage(error);
this.requestStatus.fetchFilehashes = 'failed';
- this.errorInfo.fetchFilehashes.message = error.message;
- console.error(error.message);
+ this.errorInfo.fetchFilehashes.message = errorMessage;
+ console.error(errorMessage);
}
});
@@ -236,9 +249,10 @@ export class DataImportRootStore {
}
try {
+ const fileSize = this.getRequiredFileUploadTask(fileName).size;
const result: AxiosResponse<responseData<FileUploadResult>> = yield axios
.post<responseData<FileUploadResult>>(
-
`${baseUrl}/${this.currentId}/job-manager/${this.currentJobId}/upload-file?total=${fileChunkTotal}&index=${fileChunkList.chunkIndex}&name=${fileName}&token=${this.fileHashes[fileName]}`,
+
`${baseUrl}/${this.currentId}/job-manager/${this.currentJobId}/upload-file?total=${fileChunkTotal}&index=${fileChunkList.chunkIndex}&name=${fileName}&size=${fileSize}&token=${this.fileHashes[fileName]}`,
formData,
{
headers: {
@@ -257,9 +271,10 @@ export class DataImportRootStore {
this.requestStatus.uploadFiles = 'success';
return result.data.data;
} catch (error) {
+ const errorMessage = getErrorMessage(error);
this.requestStatus.uploadFiles = 'failed';
- this.errorInfo.uploadFiles.message = error.message;
- console.error(error.message);
+ this.errorInfo.uploadFiles.message = errorMessage;
+ console.error(errorMessage);
}
});
@@ -283,9 +298,10 @@ export class DataImportRootStore {
this.requestStatus.deleteFiles = 'success';
} catch (error) {
+ const errorMessage = getErrorMessage(error);
this.requestStatus.deleteFiles = 'failed';
- this.errorInfo.deleteFiles.message = error.message;
- console.error(error.message);
+ this.errorInfo.deleteFiles.message = errorMessage;
+ console.error(errorMessage);
}
});
@@ -308,9 +324,10 @@ export class DataImportRootStore {
this.requestStatus.sendUploadCompleteSignal = 'success';
} catch (error) {
+ const errorMessage = getErrorMessage(error);
this.requestStatus.sendUploadCompleteSignal = 'failed';
- this.errorInfo.sendUploadCompleteSignal.message = error.message;
- console.error(error.message);
+ this.errorInfo.sendUploadCompleteSignal.message = errorMessage;
+ console.error(errorMessage);
}
});
@@ -333,9 +350,10 @@ export class DataImportRootStore {
this.requestStatus.sendMappingCompleteSignal = 'success';
} catch (error) {
+ const errorMessage = getErrorMessage(error);
this.requestStatus.sendMappingCompleteSignal = 'failed';
- this.errorInfo.sendMappingCompleteSignal.message = error.message;
- console.error(error.message);
+ this.errorInfo.sendMappingCompleteSignal.message = errorMessage;
+ console.error(errorMessage);
}
});
@@ -366,8 +384,9 @@ export class DataImportRootStore {
this.vertexTypes = result.data.data.records;
this.requestStatus.fetchVertexTypeList = 'success';
} catch (error) {
+ const errorMessage = getErrorMessage(error);
this.requestStatus.fetchVertexTypeList = 'failed';
- this.errorInfo.fetchVertexTypeList.message = error.message;
+ this.errorInfo.fetchVertexTypeList.message = errorMessage;
}
});
@@ -398,8 +417,9 @@ export class DataImportRootStore {
this.edgeTypes = result.data.data.records;
this.requestStatus.fetchEdgeTypeList = 'success';
} catch (error) {
+ const errorMessage = getErrorMessage(error);
this.requestStatus.fetchEdgeTypeList = 'failed';
- this.errorInfo.fetchEdgeTypeList.message = error.message;
+ this.errorInfo.fetchEdgeTypeList.message = errorMessage;
}
});
}
diff --git
a/hugegraph-hubble/hubble-fe/src/stores/types/GraphManagementStore/dataImportStore.ts
b/hugegraph-hubble/hubble-fe/src/stores/types/GraphManagementStore/dataImportStore.ts
index fbeb8c03..28a1d81b 100644
---
a/hugegraph-hubble/hubble-fe/src/stores/types/GraphManagementStore/dataImportStore.ts
+++
b/hugegraph-hubble/hubble-fe/src/stores/types/GraphManagementStore/dataImportStore.ts
@@ -148,6 +148,7 @@ export interface FileMapInfo {
name: string;
total_lines: number;
total_size: string;
+ total_size_bytes: number;
file_setting: FileConfig;
file_status: string;
vertex_mappings: VertexMap[];
diff --git a/hugegraph-hubble/hubble-fe/src/stores/utils/index.ts
b/hugegraph-hubble/hubble-fe/src/stores/utils/index.ts
index a2d2c490..36e82017 100644
--- a/hugegraph-hubble/hubble-fe/src/stores/utils/index.ts
+++ b/hugegraph-hubble/hubble-fe/src/stores/utils/index.ts
@@ -52,10 +52,44 @@ export const edgeWidthMapping: Record<string, number> = {
/* functions */
-export function checkIfLocalNetworkOffline(error: any) {
- if (error.request) {
- throw new Error(i18next.t('addition.store.network-error'));
+export function getErrorMessage(error: unknown) {
+ if (error instanceof Error) {
+ return error.message;
}
+
+ if (typeof error === 'string') {
+ return error;
+ }
+
+ if (
+ typeof error === 'object' &&
+ error !== null &&
+ 'message' in error
+ ) {
+ const errorWithMessage = error as { message?: unknown };
+
+ if (typeof errorWithMessage.message === 'string') {
+ return errorWithMessage.message;
+ }
+ }
+
+ return String(error);
+}
+
+export function checkIfLocalNetworkOffline(error: unknown) {
+ if (
+ typeof error === 'object' &&
+ error !== null &&
+ 'request' in error
+ ) {
+ const errorWithRequest = error as { request?: unknown };
+
+ if (!isUndefined(errorWithRequest.request)) {
+ throw new Error(i18next.t('addition.store.network-error'));
+ }
+ }
+
+ throw error;
}
export function mapMetadataProperties(
diff --git a/hugegraph-hubble/hubble-fe/src/utils/dataImportUpload.ts
b/hugegraph-hubble/hubble-fe/src/utils/dataImportUpload.ts
new file mode 100644
index 00000000..a36467b3
--- /dev/null
+++ b/hugegraph-hubble/hubble-fe/src/utils/dataImportUpload.ts
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+const GB = 1024 * 1024 * 1024;
+
+type UploadedFile = {
+ name: string;
+ total_size_bytes: number;
+};
+
+type LocalUploadTask = {
+ name: string;
+ size: number;
+};
+
+export const getCurrentJobUploadSize = (
+ uploadedFiles: UploadedFile[],
+ localUploadTasks: LocalUploadTask[]
+) => {
+ const uploadedFileNames = uploadedFiles.map(({ name }) => name);
+ const uploadedFileSize = uploadedFiles.reduce(
+ (sum, { total_size_bytes }) => sum + total_size_bytes,
+ 0
+ );
+ const pendingFileSize = localUploadTasks
+ .filter(({ name }) => !uploadedFileNames.includes(name))
+ .reduce((sum, { size }) => sum + size, 0);
+
+ return uploadedFileSize + pendingFileSize;
+};
+
+export const isCurrentJobUploadSizeExceeded = (
+ uploadedFiles: UploadedFile[],
+ localUploadTasks: LocalUploadTask[],
+ selectedFiles: File[],
+ totalLimit = 10 * GB
+) => {
+ const selectedFileSize = selectedFiles.reduce(
+ (sum, { size }) => sum + size,
+ 0
+ );
+
+ return (
+ getCurrentJobUploadSize(uploadedFiles, localUploadTasks) +
selectedFileSize >
+ totalLimit
+ );
+};