pitrou commented on a change in pull request #9474:
URL: https://github.com/apache/arrow/pull/9474#discussion_r574720241
##########
File path: cpp/src/arrow/filesystem/filesystem.cc
##########
@@ -443,34 +449,37 @@ Result<std::shared_ptr<io::OutputStream>>
SlowFileSystem::OpenAppendStream(
}
Status CopyFiles(const std::vector<FileLocator>& sources,
- const std::vector<FileLocator>& destinations, int64_t
chunk_size,
- bool use_threads) {
+ const std::vector<FileLocator>& destinations,
+ const io::IOContext& io_context, int64_t chunk_size, bool
use_threads) {
if (sources.size() != destinations.size()) {
return Status::Invalid("Trying to copy ", sources.size(), " files into ",
destinations.size(), " paths.");
}
- return ::arrow::internal::OptionalParallelFor(
- use_threads, static_cast<int>(sources.size()), [&](int i) {
- if (sources[i].filesystem->Equals(destinations[i].filesystem)) {
- return sources[i].filesystem->CopyFile(sources[i].path,
destinations[i].path);
- }
+ auto copy_one_file = [&](int i) {
+ if (sources[i].filesystem->Equals(destinations[i].filesystem)) {
+ return sources[i].filesystem->CopyFile(sources[i].path,
destinations[i].path);
+ }
+
+ ARROW_ASSIGN_OR_RAISE(auto source,
+
sources[i].filesystem->OpenInputStream(sources[i].path));
- ARROW_ASSIGN_OR_RAISE(auto source,
-
sources[i].filesystem->OpenInputStream(sources[i].path));
+ ARROW_ASSIGN_OR_RAISE(auto destination,
destinations[i].filesystem->OpenOutputStream(
+ destinations[i].path));
+ RETURN_NOT_OK(internal::CopyStream(source, destination, chunk_size,
io_context));
+ return destination->Close();
Review comment:
Same comment here about explicitly closing files.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]