FayneBupt commented on code in PR #62689: URL: https://github.com/apache/doris/pull/62689#discussion_r3202183253
########## be/src/vec/sink/vpaimon_table_writer.cpp: ########## @@ -0,0 +1,740 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/sink/vpaimon_table_writer.h" + +#include <gen_cpp/DataSinks_types.h> + +#include <algorithm> +#include <cstdint> +#include <map> +#include <memory> +#include <string> +#include <string_view> +#include <utility> +#include <vector> + +#include "common/metrics/doris_metrics.h" +#include "core/block/block.h" +#include "core/column/column.h" +#include "exprs/vexpr.h" +#include "exprs/vexpr_context.h" +#include "runtime/query_context.h" +#include "runtime/runtime_profile.h" +#include "runtime/runtime_state.h" +#include "util/defer_op.h" +#include "vec/sink/paimon_writer_utils.h" +#include "vec/sink/writer/paimon/paimon_doris_hdfs_file_system.h" +#include "vec/sink/writer/paimon/vpaimon_partition_writer.h" + +#ifdef WITH_PAIMON_CPP +#include <arrow/array.h> +#include <arrow/c/bridge.h> +#include <arrow/record_batch.h> +#include <arrow/type.h> + +#include <cstdint> + +#include "format/arrow/arrow_block_convertor.h" +#include "format/arrow/arrow_row_batch.h" +#include "format/parquet/arrow_memory_pool.h" +#include "io/fs/hdfs_file_system.h" +#include "io/hdfs_builder.h" +#include "paimon/commit_message.h" +#include "paimon/factories/factory_creator.h" +#include "paimon/file_store_write.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/metrics.h" +#include "paimon/utils/bucket_id_calculator.h" +#include "paimon/write_context.h" +#include "vec/sink/writer/paimon/paimon_doris_memory_pool.h" + +// Force link paimon file format factories +namespace paimon { +namespace parquet {} +} // namespace paimon + +#endif + +namespace doris { +namespace vectorized { + +#ifdef WITH_PAIMON_CPP +namespace { +bool is_paimon_cpp_time_metric(std::string_view name) { + return name.size() > 3 && name.substr(name.size() - 3) == "_ns"; +} + +void attach_paimon_cpp_metrics_to_profile(RuntimeProfile* profile, + const std::shared_ptr<::paimon::Metrics>& metrics) { + if (profile == nullptr || !metrics) { + return; + } + auto all = metrics->GetAllCounters(); + if (all.empty()) { + return; + } + for (const auto& kv : all) { + std::string counter_name = "PaimonCpp_" + kv.first; + std::replace(counter_name.begin(), counter_name.end(), '.', '_'); + RuntimeProfile::Counter* counter = nullptr; + if (is_paimon_cpp_time_metric(kv.first)) { + counter = ADD_COUNTER(profile, counter_name, TUnit::TIME_NS); + } else { + counter = ADD_COUNTER(profile, counter_name, TUnit::UNIT); + } + COUNTER_UPDATE(counter, kv.second); + } +} +} // namespace +#endif + +VPaimonTableWriter::~VPaimonTableWriter() = default; + +VPaimonTableWriter::VPaimonTableWriter(const TDataSink& t_sink, + const VExprContextSPtrs& output_exprs) + : VPaimonTableWriter(t_sink, output_exprs, nullptr, nullptr) {} + +VPaimonTableWriter::VPaimonTableWriter(const TDataSink& t_sink, + const VExprContextSPtrs& output_exprs, + std::shared_ptr<Dependency> dep, + std::shared_ptr<Dependency> fin_dep) + : AsyncResultWriter(output_exprs, std::move(dep), std::move(fin_dep)), _t_sink(t_sink) { + DCHECK(_t_sink.__isset.paimon_table_sink); +} + +Status VPaimonTableWriter::init_properties(ObjectPool* /*pool*/) { + // Currently there is no extra property to initialize. Kept for symmetry + // with VIcebergTableWriter and future paimon-cpp wiring. + return Status::OK(); +} + +Status VPaimonTableWriter::open(RuntimeState* state, RuntimeProfile* profile) { + _state = state; + _profile = profile; +#ifndef WITH_PAIMON_CPP + return Status::NotSupported("paimon-cpp is not enabled"); +#else + _written_rows_counter = ADD_COUNTER(_profile, "WrittenRows", TUnit::UNIT); + _written_bytes_counter = ADD_COUNTER(_profile, "WrittenBytes", TUnit::BYTES); + _send_data_timer = ADD_TIMER(_profile, "SendDataTime"); + _project_timer = ADD_CHILD_TIMER(_profile, "ProjectTime", "SendDataTime"); + _bucket_calc_timer = ADD_CHILD_TIMER(_profile, "BucketCalcTime", "SendDataTime"); + _partition_writers_dispatch_timer = + ADD_CHILD_TIMER(_profile, "PartitionsDispatchTime", "SendDataTime"); + _partition_writers_write_timer = + ADD_CHILD_TIMER(_profile, "PartitionsWriteTime", "SendDataTime"); + _partition_writers_count = ADD_COUNTER(_profile, "PartitionsWriteCount", TUnit::UNIT); + _partition_writer_created = ADD_COUNTER(_profile, "PartitionWriterCreated", TUnit::UNIT); + _open_timer = ADD_TIMER(_profile, "OpenTime"); + _close_timer = ADD_TIMER(_profile, "CloseTime"); + _prepare_commit_timer = ADD_TIMER(_profile, "PrepareCommitTime"); + _serialize_commit_messages_timer = ADD_TIMER(_profile, "SerializeCommitMessagesTime"); + _commit_payload_bytes_counter = ADD_COUNTER(_profile, "CommitPayloadBytes", TUnit::BYTES); + + SCOPED_TIMER(_open_timer); + + ensure_paimon_doris_hdfs_file_system_registered(); + + auto registered_types = paimon::FactoryCreator::GetInstance()->GetRegisteredType(); + std::string types_str; + bool has_parquet = false; + for (const auto& t : registered_types) { + types_str += t + ", "; + has_parquet |= (t == "parquet"); + } + if (!has_parquet) { + return Status::InternalError( + "paimon-cpp parquet file format factory is not registered (missing 'parquet' in " + "FactoryCreator). Please ensure BE is built with WITH_PAIMON_CPP=ON and linked " + "with libpaimon_parquet_file_format.a (whole-archive). Registered factories: {}", + types_str); + } + + _pool = std::make_shared<PaimonDorisMemoryPool>(_state->query_mem_tracker()); + const auto& paimon_sink = _t_sink.paimon_table_sink; + if (!paimon_sink.__isset.table_location || paimon_sink.table_location.empty()) { + return Status::InvalidArgument("paimon table location is empty"); + } + std::string commit_user; + if (paimon_sink.__isset.options) { + auto it = paimon_sink.options.find("doris.commit_user"); + if (it != paimon_sink.options.end()) { + commit_user = it->second; + } + } + if (commit_user.empty()) { + commit_user = _state->user(); + } + + std::map<std::string, std::string> options; + if (paimon_sink.__isset.options) { + for (const auto& kv : paimon_sink.options) { + if (kv.first.rfind("doris.", 0) == 0) { + continue; + } + options.emplace(kv.first, kv.second); + } + } + + // Workaround for paimon-cpp issue where it defaults to LocalFileSystem if path has no scheme. + // If table_location is missing scheme (common in HDFS setup without full URI), + // and fs.defaultFS is provided in options, we prepend it. + std::string table_location = paimon_sink.table_location; + if (table_location.find("://") == std::string::npos) { + auto it = options.find("fs.defaultFS"); + if (it != options.end()) { + std::string default_fs = it->second; + // Remove trailing slash from default_fs if present + while (!default_fs.empty() && default_fs.back() == '/') { + default_fs.pop_back(); + } + // Remove leading slash from table_location if present + if (!table_location.empty() && table_location.front() == '/') { + table_location = default_fs + table_location; + } else { + table_location = default_fs + "/" + table_location; + } + } + } + + int64_t buffer_size = 256 * 1024 * 1024L; // Default 256MB + + if (_state->query_options().__isset.paimon_write_buffer_size && + _state->query_options().paimon_write_buffer_size > 0) { + buffer_size = _state->query_options().paimon_write_buffer_size; + } + + bool enable_adaptive = true; + if (_state->query_options().__isset.enable_paimon_adaptive_buffer_size) { + enable_adaptive = _state->query_options().enable_paimon_adaptive_buffer_size; + } + + if (enable_adaptive && paimon_sink.__isset.bucket_num && paimon_sink.bucket_num > 0) { + int bucket_num = paimon_sink.bucket_num; + buffer_size = get_paimon_write_buffer_size(buffer_size, true, bucket_num); + LOG(INFO) << "Adaptive Paimon Buffer Size: bucket_num=" << bucket_num + << ", adjusted_buffer_size=" << buffer_size; + } + LOG(INFO) << "Paimon Native Writer Final Buffer Size: " << buffer_size + << " (enable_adaptive=" << enable_adaptive << ")"; + options["write-buffer-size"] = std::to_string(buffer_size); + + if (_state->query_options().__isset.paimon_target_file_size && + _state->query_options().paimon_target_file_size > 0) { + options["target-file-size"] = + std::to_string(_state->query_options().paimon_target_file_size); + LOG(INFO) << "Paimon Native Writer Target File Size: " + << _state->query_options().paimon_target_file_size; + } + + options["file.format"] = "parquet"; + options["manifest.format"] = "parquet"; + if ((!paimon_sink.__isset.bucket_num || paimon_sink.bucket_num <= 0) && + paimon_sink.__isset.bucket_keys && !paimon_sink.bucket_keys.empty()) { + return Status::NotSupported( + "paimon-cpp native writer does not support primary-key table with dynamic bucket " + "(bucket=-1) yet; enable_paimon_jni_writer=true is required"); + } + + ::paimon::WriteContextBuilder builder(table_location, commit_user); + builder.SetOptions(options); + builder.WithIgnorePreviousFiles(true); + builder.WithMemoryPool(_pool); + builder.WithFileSystemSchemeToIdentifierMap( + {{"hdfs", kPaimonDorisHdfsFsIdentifier}, {"dfs", kPaimonDorisHdfsFsIdentifier}}); + auto ctx_result = builder.Finish(); + if (!ctx_result.ok()) { + return Status::InternalError("failed to build paimon write context: {}", + ctx_result.status().ToString()); + } + auto write_result = ::paimon::FileStoreWrite::Create(std::move(ctx_result).value()); + if (!write_result.ok()) { + auto message = write_result.status().ToString(); + if (message.find("format 'orc'") != std::string::npos || + message.find("format \"orc\"") != std::string::npos) { + return Status::InternalError( + "failed to create paimon file store write: {}. This Doris build does not " + "support ORC in paimon-cpp; create paimon table with options " + "file.format=parquet and manifest.format=parquet.", + message); + } + return Status::InternalError("failed to create paimon file store write: {}", message); + } + _file_store_write = std::move(write_result).value(); + return Status::OK(); +#endif +} + +Status VPaimonTableWriter::_init_partition_column_indices(const ::doris::Block& block) const { + const TPaimonTableSink& paimon_sink = _t_sink.paimon_table_sink; + if (!paimon_sink.__isset.partition_keys || paimon_sink.partition_keys.empty() || + _partition_indices_inited) { + return Status::OK(); + } + + std::unordered_map<std::string, int> name_to_idx; + for (int i = 0; i < block.columns(); ++i) { + std::string col_name = block.get_by_position(i).name; + if (col_name.empty()) { + if (paimon_sink.__isset.column_names && i < paimon_sink.column_names.size()) { + col_name = paimon_sink.column_names[i]; + } + } + name_to_idx.emplace(col_name, i); + } + _partition_column_indices.clear(); + _partition_column_indices.reserve(paimon_sink.partition_keys.size()); + for (const auto& key_name : paimon_sink.partition_keys) { + auto it = name_to_idx.find(key_name); + if (it == name_to_idx.end()) { + return Status::InvalidArgument("paimon partition key {} not found in output block", + key_name); + } + _partition_column_indices.push_back(it->second); + } + _partition_indices_inited = true; + return Status::OK(); +} + +std::string VPaimonTableWriter::_default_partition_name() const { + std::string default_part = "__DEFAULT_PARTITION__"; + const TPaimonTableSink& paimon_sink = _t_sink.paimon_table_sink; + if (paimon_sink.__isset.options) { + auto it = paimon_sink.options.find("partition.default-name"); + if (it != paimon_sink.options.end() && !it->second.empty()) { + default_part = it->second; + } + } + return default_part; +} + +Status VPaimonTableWriter::_collect_partition_value_columns( + const ::doris::Block& block, + std::vector<std::vector<std::string>>* partition_value_columns) const { + partition_value_columns->clear(); + const TPaimonTableSink& paimon_sink = _t_sink.paimon_table_sink; + if (!paimon_sink.__isset.partition_keys || paimon_sink.partition_keys.empty()) { + return Status::OK(); + } + + RETURN_IF_ERROR(_init_partition_column_indices(block)); + const size_t rows = block.rows(); + partition_value_columns->resize(_partition_column_indices.size()); + const std::string default_part = _default_partition_name(); + DataTypeSerDe::FormatOptions options; + for (size_t part_idx = 0; part_idx < _partition_column_indices.size(); ++part_idx) { + auto& values = (*partition_value_columns)[part_idx]; + values.resize(rows); + const auto& col_with_type = block.get_by_position(_partition_column_indices[part_idx]); + const auto& type = col_with_type.type; + const auto& col = col_with_type.column; + for (size_t row = 0; row < rows; ++row) { + if (col->is_null_at(row)) { + values[row] = default_part; + } else { + values[row] = type->to_string(*col, row, options); + } + } + } + return Status::OK(); +} + +Status VPaimonTableWriter::_get_or_create_writer(const WriteKey& key, + std::shared_ptr<VPaimonPartitionWriter>* writer) { + auto it = _writers.find(key); + if (it != _writers.end()) { + *writer = it->second; + return Status::OK(); + } + + auto new_writer = + std::make_shared<VPaimonPartitionWriter>(_t_sink, key.partition_values, key.bucket_id +#ifdef WITH_PAIMON_CPP + , + _file_store_write.get(), _pool +#endif + ); + RETURN_IF_ERROR(new_writer->init_properties(nullptr)); + RETURN_IF_ERROR(new_writer->open(_state, _profile)); + COUNTER_UPDATE(_partition_writer_created, 1); + DorisMetrics::instance()->paimon_partition_writer_created->increment(1); + _writers.emplace(key, new_writer); + *writer = std::move(new_writer); + return Status::OK(); +} + +Status VPaimonTableWriter::write(RuntimeState* state, ::doris::Block& block) { +#ifndef WITH_PAIMON_CPP + return Status::NotSupported("paimon-cpp is not enabled"); +#else + if (block.rows() == 0) { + return Status::OK(); + } + SCOPED_TIMER(_send_data_timer); + int64_t send_data_ns = 0; + auto to_ms_ceil = [](int64_t ns) -> uint64_t { + if (ns <= 0) { + return 0; + } + return static_cast<uint64_t>((ns + 999999) / 1000000); + }; + Defer record_send_data_latency {[&]() { + DorisMetrics::instance()->paimon_write_send_data_latency_ms->add(to_ms_ceil(send_data_ns)); + }}; + SCOPED_RAW_TIMER(&send_data_ns); + + Block output_block; + int64_t project_ns = 0; + { + SCOPED_TIMER(_project_timer); + SCOPED_RAW_TIMER(&project_ns); + RETURN_IF_ERROR(_projection_block(block, &output_block)); + } + _row_count += output_block.rows(); Review Comment: Fixed. The native paimon-cpp writer now updates RuntimeState::update_num_rows_load_total() and update_num_bytes_load_total() after the projected block is accepted, matching the JNI writer path so FE can report loaded rows/bytes correctly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
