FayneBupt commented on code in PR #62689: URL: https://github.com/apache/doris/pull/62689#discussion_r3202192214
########## be/src/vec/sink/writer/paimon/vpaimon_partition_writer.cpp: ########## @@ -0,0 +1,269 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/sink/writer/paimon/vpaimon_partition_writer.h" + +#include <gen_cpp/DataSinks_types.h> + +#include <map> +#include <utility> + +#include "common/metrics/doris_metrics.h" +#include "format/arrow/arrow_block_convertor.h" +#include "format/arrow/arrow_row_batch.h" +#include "runtime/runtime_state.h" + +#ifdef WITH_PAIMON_CPP +#include <arrow/array.h> +#include <arrow/c/bridge.h> +#include <arrow/memory_pool.h> +#include <arrow/record_batch.h> +#include <arrow/status.h> +#include <arrow/type.h> + +#include <string> + +#include "paimon/file_store_write.h" +#include "paimon/record_batch.h" +#endif + +namespace doris { +namespace vectorized { + +#ifdef WITH_PAIMON_CPP +namespace { +class PaimonArrowMemPoolAdaptor : public arrow::MemoryPool { +public: + explicit PaimonArrowMemPoolAdaptor(std::shared_ptr<::paimon::MemoryPool> pool) + : pool_(std::move(pool)) {} + + arrow::Status Allocate(int64_t size, int64_t alignment, uint8_t** out) override { + *out = reinterpret_cast<uint8_t*>(pool_->Malloc(size, alignment)); Review Comment: Fixed. The Arrow memory pool adapter now returns arrow::Status::OutOfMemory when Malloc/Realloc returns null, preserves the previous pointer on failed realloc, and updates allocation stats only after successful allocation. ########## be/src/vec/sink/writer/paimon/paimon_doris_hdfs_file_system.cpp: ########## @@ -0,0 +1,439 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/sink/writer/paimon/paimon_doris_hdfs_file_system.h" + +#ifdef WITH_PAIMON_CPP + +#include <gen_cpp/PlanNodes_types.h> + +#include <algorithm> +#include <cstdint> +#include <map> +#include <memory> +#include <mutex> +#include <string> +#include <string_view> +#include <utility> +#include <vector> + +#include "common/status.h" +#include "io/fs/file_reader.h" +#include "io/fs/file_system.h" +#include "io/fs/file_writer.h" +#include "io/fs/hdfs_file_system.h" +#include "io/fs/path.h" +#include "io/hdfs_builder.h" +#include "paimon/factories/factory_creator.h" +#include "paimon/fs/file_system.h" +#include "paimon/fs/file_system_factory.h" +#include "paimon/result.h" +#include "paimon/status.h" +#include "util/slice.h" + +namespace { + +std::string _extract_hdfs_fs_name(const std::string& uri) { + auto starts_with = [&](const char* prefix) { return uri.rfind(prefix, 0) == 0; }; + std::string_view scheme; + if (starts_with("hdfs://")) { + scheme = "hdfs://"; + } else if (starts_with("dfs://")) { + scheme = "dfs://"; + } else { + return {}; + } + size_t authority_start = scheme.size(); + size_t first_slash = uri.find('/', authority_start); + if (first_slash == std::string::npos) { + return uri; + } + if (first_slash == authority_start) { + return {}; + } + return uri.substr(0, first_slash); +} + +paimon::Status _to_paimon_status(const doris::Status& st) { + if (st.ok()) { + return paimon::Status::OK(); + } + return paimon::Status::IOError(st.to_string()); +} + +class DorisPaimonBasicFileStatus final : public paimon::BasicFileStatus { +public: + DorisPaimonBasicFileStatus(std::string path, bool is_dir) + : _path(std::move(path)), _is_dir(is_dir) {} + + bool IsDir() const override { return _is_dir; } + + std::string GetPath() const override { return _path; } + +private: + std::string _path; + bool _is_dir; +}; + +class DorisPaimonFileStatus final : public paimon::FileStatus { +public: + DorisPaimonFileStatus(std::string path, bool is_dir, uint64_t len, int64_t mtime_ms) + : _path(std::move(path)), _is_dir(is_dir), _len(len), _mtime_ms(mtime_ms) {} + + uint64_t GetLen() const override { return _len; } + + bool IsDir() const override { return _is_dir; } + + std::string GetPath() const override { return _path; } + + int64_t GetModificationTime() const override { return _mtime_ms; } + +private: + std::string _path; + bool _is_dir; + uint64_t _len; + int64_t _mtime_ms; +}; + +class DorisPaimonInputStream final : public paimon::InputStream { +public: + DorisPaimonInputStream(doris::io::FileReaderSPtr reader, std::string uri) + : _reader(std::move(reader)), _uri(std::move(uri)) {} + + paimon::Status Close() override { return _to_paimon_status(_reader->close()); } + + paimon::Status Seek(int64_t offset, paimon::SeekOrigin origin) override { + int64_t base = 0; + switch (origin) { + case paimon::FS_SEEK_SET: + base = 0; + break; + case paimon::FS_SEEK_CUR: + base = _pos; + break; + case paimon::FS_SEEK_END: { + auto len = Length(); + if (!len.ok()) { + return len.status(); + } + base = static_cast<int64_t>(len.value()); + break; + } + default: + return paimon::Status::Invalid("invalid seek origin"); + } + int64_t next = base + offset; + if (next < 0) { + return paimon::Status::Invalid("negative seek position: ", next); + } + _pos = next; + return paimon::Status::OK(); + } + + paimon::Result<int64_t> GetPos() const override { return _pos; } + + paimon::Result<int32_t> Read(char* buffer, uint32_t size) override { + size_t bytes_read = 0; + doris::Slice slice(buffer, size); + doris::Status st = _reader->read_at(_pos, slice, &bytes_read, nullptr); + if (!st.ok()) { + return paimon::Status::IOError(st.to_string()); + } + _pos += static_cast<int64_t>(bytes_read); + return static_cast<int32_t>(bytes_read); + } + + paimon::Result<int32_t> Read(char* buffer, uint32_t size, uint64_t offset) override { + size_t bytes_read = 0; + doris::Slice slice(buffer, size); + doris::Status st = _reader->read_at(offset, slice, &bytes_read, nullptr); + if (!st.ok()) { + return paimon::Status::IOError(st.to_string()); + } + return static_cast<int32_t>(bytes_read); + } + + void ReadAsync(char* buffer, uint32_t size, uint64_t offset, + std::function<void(paimon::Status)>&& callback) override { + auto res = Read(buffer, size, offset); + if (res.ok()) { + callback(paimon::Status::OK()); + } else { + callback(res.status()); + } + } + + paimon::Result<std::string> GetUri() const override { return _uri; } + + paimon::Result<uint64_t> Length() const override { + return static_cast<uint64_t>(_reader->size()); + } + +private: + doris::io::FileReaderSPtr _reader; + std::string _uri; + int64_t _pos = 0; +}; + +class DorisPaimonOutputStream final : public paimon::OutputStream { +public: + DorisPaimonOutputStream(doris::io::FileWriterPtr writer, std::string uri) + : _writer(std::move(writer)), _uri(std::move(uri)) {} + + paimon::Status Close() override { return _to_paimon_status(_writer->close()); } + + paimon::Result<int32_t> Write(const char* buffer, uint32_t size) override { + doris::Slice slice(buffer, size); + doris::Status st = _writer->append(slice); + if (!st.ok()) { + return paimon::Status::IOError(st.to_string()); + } + _pos += size; + return static_cast<int32_t>(size); + } + + paimon::Status Flush() override { return paimon::Status::OK(); } + + paimon::Result<int64_t> GetPos() const override { return _pos; } + + paimon::Result<std::string> GetUri() const override { return _uri; } + +private: + doris::io::FileWriterPtr _writer; + std::string _uri; + int64_t _pos = 0; +}; + +class DorisPaimonFileSystem final : public paimon::FileSystem { +public: + explicit DorisPaimonFileSystem(std::shared_ptr<doris::io::FileSystem> fs) + : _fs(std::move(fs)) {} + + paimon::Result<std::unique_ptr<paimon::InputStream>> Open( + const std::string& path) const override { + doris::io::FileReaderSPtr reader; + doris::Status st = _fs->open_file(doris::io::Path(path), &reader, nullptr); + if (!st.ok()) { + return paimon::Status::IOError(st.to_string()); + } + return std::make_unique<DorisPaimonInputStream>(std::move(reader), path); + } + + paimon::Result<std::unique_ptr<paimon::OutputStream>> Create(const std::string& path, + bool overwrite) const override { + bool exists = false; + doris::Status exists_st = _fs->exists(doris::io::Path(path), &exists); + if (!exists_st.ok()) { + return paimon::Status::IOError(exists_st.to_string()); + } + if (exists) { + if (!overwrite) { + return paimon::Status::Exist("path already exists: ", path); + } + doris::Status del_st = _fs->delete_directory(doris::io::Path(path)); Review Comment: Fixed. Create(path, overwrite=true) now verifies the existing target is a file and deletes it with delete_file. If the target is a directory, it rejects the overwrite instead of recursively deleting the subtree. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
