suxiaogang223 commented on code in PR #62689:
URL: https://github.com/apache/doris/pull/62689#discussion_r3199759452


##########
be/src/vec/sink/vpaimon_table_writer.cpp:
##########
@@ -0,0 +1,740 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/sink/vpaimon_table_writer.h"
+
+#include <gen_cpp/DataSinks_types.h>
+
+#include <algorithm>
+#include <cstdint>
+#include <map>
+#include <memory>
+#include <string>
+#include <string_view>
+#include <utility>
+#include <vector>
+
+#include "common/metrics/doris_metrics.h"
+#include "core/block/block.h"
+#include "core/column/column.h"
+#include "exprs/vexpr.h"
+#include "exprs/vexpr_context.h"
+#include "runtime/query_context.h"
+#include "runtime/runtime_profile.h"
+#include "runtime/runtime_state.h"
+#include "util/defer_op.h"
+#include "vec/sink/paimon_writer_utils.h"
+#include "vec/sink/writer/paimon/paimon_doris_hdfs_file_system.h"
+#include "vec/sink/writer/paimon/vpaimon_partition_writer.h"
+
+#ifdef WITH_PAIMON_CPP
+#include <arrow/array.h>
+#include <arrow/c/bridge.h>
+#include <arrow/record_batch.h>
+#include <arrow/type.h>
+
+#include <cstdint>
+
+#include "format/arrow/arrow_block_convertor.h"
+#include "format/arrow/arrow_row_batch.h"
+#include "format/parquet/arrow_memory_pool.h"
+#include "io/fs/hdfs_file_system.h"
+#include "io/hdfs_builder.h"
+#include "paimon/commit_message.h"
+#include "paimon/factories/factory_creator.h"
+#include "paimon/file_store_write.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/metrics.h"
+#include "paimon/utils/bucket_id_calculator.h"
+#include "paimon/write_context.h"
+#include "vec/sink/writer/paimon/paimon_doris_memory_pool.h"
+
+// Force link paimon file format factories
+namespace paimon {
+namespace parquet {}
+} // namespace paimon
+
+#endif
+
+namespace doris {
+namespace vectorized {
+
+#ifdef WITH_PAIMON_CPP
+namespace {
+bool is_paimon_cpp_time_metric(std::string_view name) {
+    return name.size() > 3 && name.substr(name.size() - 3) == "_ns";
+}
+
+void attach_paimon_cpp_metrics_to_profile(RuntimeProfile* profile,
+                                          const 
std::shared_ptr<::paimon::Metrics>& metrics) {
+    if (profile == nullptr || !metrics) {
+        return;
+    }
+    auto all = metrics->GetAllCounters();
+    if (all.empty()) {
+        return;
+    }
+    for (const auto& kv : all) {
+        std::string counter_name = "PaimonCpp_" + kv.first;
+        std::replace(counter_name.begin(), counter_name.end(), '.', '_');
+        RuntimeProfile::Counter* counter = nullptr;
+        if (is_paimon_cpp_time_metric(kv.first)) {
+            counter = ADD_COUNTER(profile, counter_name, TUnit::TIME_NS);
+        } else {
+            counter = ADD_COUNTER(profile, counter_name, TUnit::UNIT);
+        }
+        COUNTER_UPDATE(counter, kv.second);
+    }
+}
+} // namespace
+#endif
+
+VPaimonTableWriter::~VPaimonTableWriter() = default;
+
+VPaimonTableWriter::VPaimonTableWriter(const TDataSink& t_sink,
+                                       const VExprContextSPtrs& output_exprs)
+        : VPaimonTableWriter(t_sink, output_exprs, nullptr, nullptr) {}
+
+VPaimonTableWriter::VPaimonTableWriter(const TDataSink& t_sink,
+                                       const VExprContextSPtrs& output_exprs,
+                                       std::shared_ptr<Dependency> dep,
+                                       std::shared_ptr<Dependency> fin_dep)
+        : AsyncResultWriter(output_exprs, std::move(dep), std::move(fin_dep)), 
_t_sink(t_sink) {
+    DCHECK(_t_sink.__isset.paimon_table_sink);
+}
+
+Status VPaimonTableWriter::init_properties(ObjectPool* /*pool*/) {
+    // Currently there is no extra property to initialize. Kept for symmetry
+    // with VIcebergTableWriter and future paimon-cpp wiring.
+    return Status::OK();
+}
+
+Status VPaimonTableWriter::open(RuntimeState* state, RuntimeProfile* profile) {
+    _state = state;
+    _profile = profile;
+#ifndef WITH_PAIMON_CPP
+    return Status::NotSupported("paimon-cpp is not enabled");
+#else
+    _written_rows_counter = ADD_COUNTER(_profile, "WrittenRows", TUnit::UNIT);
+    _written_bytes_counter = ADD_COUNTER(_profile, "WrittenBytes", 
TUnit::BYTES);
+    _send_data_timer = ADD_TIMER(_profile, "SendDataTime");
+    _project_timer = ADD_CHILD_TIMER(_profile, "ProjectTime", "SendDataTime");
+    _bucket_calc_timer = ADD_CHILD_TIMER(_profile, "BucketCalcTime", 
"SendDataTime");
+    _partition_writers_dispatch_timer =
+            ADD_CHILD_TIMER(_profile, "PartitionsDispatchTime", 
"SendDataTime");
+    _partition_writers_write_timer =
+            ADD_CHILD_TIMER(_profile, "PartitionsWriteTime", "SendDataTime");
+    _partition_writers_count = ADD_COUNTER(_profile, "PartitionsWriteCount", 
TUnit::UNIT);
+    _partition_writer_created = ADD_COUNTER(_profile, 
"PartitionWriterCreated", TUnit::UNIT);
+    _open_timer = ADD_TIMER(_profile, "OpenTime");
+    _close_timer = ADD_TIMER(_profile, "CloseTime");
+    _prepare_commit_timer = ADD_TIMER(_profile, "PrepareCommitTime");
+    _serialize_commit_messages_timer = ADD_TIMER(_profile, 
"SerializeCommitMessagesTime");
+    _commit_payload_bytes_counter = ADD_COUNTER(_profile, 
"CommitPayloadBytes", TUnit::BYTES);
+
+    SCOPED_TIMER(_open_timer);
+
+    ensure_paimon_doris_hdfs_file_system_registered();
+
+    auto registered_types = 
paimon::FactoryCreator::GetInstance()->GetRegisteredType();
+    std::string types_str;
+    bool has_parquet = false;
+    for (const auto& t : registered_types) {
+        types_str += t + ", ";
+        has_parquet |= (t == "parquet");
+    }
+    if (!has_parquet) {
+        return Status::InternalError(
+                "paimon-cpp parquet file format factory is not registered 
(missing 'parquet' in "
+                "FactoryCreator). Please ensure BE is built with 
WITH_PAIMON_CPP=ON and linked "
+                "with libpaimon_parquet_file_format.a (whole-archive). 
Registered factories: {}",
+                types_str);
+    }
+
+    _pool = 
std::make_shared<PaimonDorisMemoryPool>(_state->query_mem_tracker());
+    const auto& paimon_sink = _t_sink.paimon_table_sink;
+    if (!paimon_sink.__isset.table_location || 
paimon_sink.table_location.empty()) {
+        return Status::InvalidArgument("paimon table location is empty");
+    }
+    std::string commit_user;
+    if (paimon_sink.__isset.options) {
+        auto it = paimon_sink.options.find("doris.commit_user");
+        if (it != paimon_sink.options.end()) {
+            commit_user = it->second;
+        }
+    }
+    if (commit_user.empty()) {
+        commit_user = _state->user();
+    }
+
+    std::map<std::string, std::string> options;
+    if (paimon_sink.__isset.options) {
+        for (const auto& kv : paimon_sink.options) {
+            if (kv.first.rfind("doris.", 0) == 0) {
+                continue;
+            }
+            options.emplace(kv.first, kv.second);
+        }
+    }
+
+    // Workaround for paimon-cpp issue where it defaults to LocalFileSystem if 
path has no scheme.
+    // If table_location is missing scheme (common in HDFS setup without full 
URI),
+    // and fs.defaultFS is provided in options, we prepend it.
+    std::string table_location = paimon_sink.table_location;
+    if (table_location.find("://") == std::string::npos) {
+        auto it = options.find("fs.defaultFS");
+        if (it != options.end()) {
+            std::string default_fs = it->second;
+            // Remove trailing slash from default_fs if present
+            while (!default_fs.empty() && default_fs.back() == '/') {
+                default_fs.pop_back();
+            }
+            // Remove leading slash from table_location if present
+            if (!table_location.empty() && table_location.front() == '/') {
+                table_location = default_fs + table_location;
+            } else {
+                table_location = default_fs + "/" + table_location;
+            }
+        }
+    }
+
+    int64_t buffer_size = 256 * 1024 * 1024L; // Default 256MB
+
+    if (_state->query_options().__isset.paimon_write_buffer_size &&
+        _state->query_options().paimon_write_buffer_size > 0) {
+        buffer_size = _state->query_options().paimon_write_buffer_size;
+    }
+
+    bool enable_adaptive = true;
+    if (_state->query_options().__isset.enable_paimon_adaptive_buffer_size) {
+        enable_adaptive = 
_state->query_options().enable_paimon_adaptive_buffer_size;
+    }
+
+    if (enable_adaptive && paimon_sink.__isset.bucket_num && 
paimon_sink.bucket_num > 0) {
+        int bucket_num = paimon_sink.bucket_num;
+        buffer_size = get_paimon_write_buffer_size(buffer_size, true, 
bucket_num);
+        LOG(INFO) << "Adaptive Paimon Buffer Size: bucket_num=" << bucket_num
+                  << ", adjusted_buffer_size=" << buffer_size;
+    }
+    LOG(INFO) << "Paimon Native Writer Final Buffer Size: " << buffer_size
+              << " (enable_adaptive=" << enable_adaptive << ")";
+    options["write-buffer-size"] = std::to_string(buffer_size);
+
+    if (_state->query_options().__isset.paimon_target_file_size &&
+        _state->query_options().paimon_target_file_size > 0) {
+        options["target-file-size"] =
+                
std::to_string(_state->query_options().paimon_target_file_size);
+        LOG(INFO) << "Paimon Native Writer Target File Size: "
+                  << _state->query_options().paimon_target_file_size;
+    }
+
+    options["file.format"] = "parquet";
+    options["manifest.format"] = "parquet";
+    if ((!paimon_sink.__isset.bucket_num || paimon_sink.bucket_num <= 0) &&
+        paimon_sink.__isset.bucket_keys && !paimon_sink.bucket_keys.empty()) {
+        return Status::NotSupported(
+                "paimon-cpp native writer does not support primary-key table 
with dynamic bucket "
+                "(bucket=-1) yet; enable_paimon_jni_writer=true is required");
+    }
+
+    ::paimon::WriteContextBuilder builder(table_location, commit_user);
+    builder.SetOptions(options);
+    builder.WithIgnorePreviousFiles(true);
+    builder.WithMemoryPool(_pool);
+    builder.WithFileSystemSchemeToIdentifierMap(
+            {{"hdfs", kPaimonDorisHdfsFsIdentifier}, {"dfs", 
kPaimonDorisHdfsFsIdentifier}});
+    auto ctx_result = builder.Finish();
+    if (!ctx_result.ok()) {
+        return Status::InternalError("failed to build paimon write context: 
{}",
+                                     ctx_result.status().ToString());
+    }
+    auto write_result = 
::paimon::FileStoreWrite::Create(std::move(ctx_result).value());
+    if (!write_result.ok()) {
+        auto message = write_result.status().ToString();
+        if (message.find("format 'orc'") != std::string::npos ||
+            message.find("format \"orc\"") != std::string::npos) {
+            return Status::InternalError(
+                    "failed to create paimon file store write: {}. This Doris 
build does not "
+                    "support ORC in paimon-cpp; create paimon table with 
options "
+                    "file.format=parquet and manifest.format=parquet.",
+                    message);
+        }
+        return Status::InternalError("failed to create paimon file store 
write: {}", message);
+    }
+    _file_store_write = std::move(write_result).value();
+    return Status::OK();
+#endif
+}
+
+Status VPaimonTableWriter::_init_partition_column_indices(const 
::doris::Block& block) const {
+    const TPaimonTableSink& paimon_sink = _t_sink.paimon_table_sink;
+    if (!paimon_sink.__isset.partition_keys || 
paimon_sink.partition_keys.empty() ||
+        _partition_indices_inited) {
+        return Status::OK();
+    }
+
+    std::unordered_map<std::string, int> name_to_idx;
+    for (int i = 0; i < block.columns(); ++i) {
+        std::string col_name = block.get_by_position(i).name;
+        if (col_name.empty()) {
+            if (paimon_sink.__isset.column_names && i < 
paimon_sink.column_names.size()) {
+                col_name = paimon_sink.column_names[i];
+            }
+        }
+        name_to_idx.emplace(col_name, i);
+    }
+    _partition_column_indices.clear();
+    _partition_column_indices.reserve(paimon_sink.partition_keys.size());
+    for (const auto& key_name : paimon_sink.partition_keys) {
+        auto it = name_to_idx.find(key_name);
+        if (it == name_to_idx.end()) {
+            return Status::InvalidArgument("paimon partition key {} not found 
in output block",
+                                           key_name);
+        }
+        _partition_column_indices.push_back(it->second);
+    }
+    _partition_indices_inited = true;
+    return Status::OK();
+}
+
+std::string VPaimonTableWriter::_default_partition_name() const {
+    std::string default_part = "__DEFAULT_PARTITION__";
+    const TPaimonTableSink& paimon_sink = _t_sink.paimon_table_sink;
+    if (paimon_sink.__isset.options) {
+        auto it = paimon_sink.options.find("partition.default-name");
+        if (it != paimon_sink.options.end() && !it->second.empty()) {
+            default_part = it->second;
+        }
+    }
+    return default_part;
+}
+
+Status VPaimonTableWriter::_collect_partition_value_columns(
+        const ::doris::Block& block,
+        std::vector<std::vector<std::string>>* partition_value_columns) const {
+    partition_value_columns->clear();
+    const TPaimonTableSink& paimon_sink = _t_sink.paimon_table_sink;
+    if (!paimon_sink.__isset.partition_keys || 
paimon_sink.partition_keys.empty()) {
+        return Status::OK();
+    }
+
+    RETURN_IF_ERROR(_init_partition_column_indices(block));
+    const size_t rows = block.rows();
+    partition_value_columns->resize(_partition_column_indices.size());
+    const std::string default_part = _default_partition_name();
+    DataTypeSerDe::FormatOptions options;
+    for (size_t part_idx = 0; part_idx < _partition_column_indices.size(); 
++part_idx) {
+        auto& values = (*partition_value_columns)[part_idx];
+        values.resize(rows);
+        const auto& col_with_type = 
block.get_by_position(_partition_column_indices[part_idx]);
+        const auto& type = col_with_type.type;
+        const auto& col = col_with_type.column;
+        for (size_t row = 0; row < rows; ++row) {
+            if (col->is_null_at(row)) {
+                values[row] = default_part;
+            } else {
+                values[row] = type->to_string(*col, row, options);
+            }
+        }
+    }
+    return Status::OK();
+}
+
+Status VPaimonTableWriter::_get_or_create_writer(const WriteKey& key,
+                                                 
std::shared_ptr<VPaimonPartitionWriter>* writer) {
+    auto it = _writers.find(key);
+    if (it != _writers.end()) {
+        *writer = it->second;
+        return Status::OK();
+    }
+
+    auto new_writer =
+            std::make_shared<VPaimonPartitionWriter>(_t_sink, 
key.partition_values, key.bucket_id
+#ifdef WITH_PAIMON_CPP
+                                                     ,
+                                                     _file_store_write.get(), 
_pool
+#endif
+            );
+    RETURN_IF_ERROR(new_writer->init_properties(nullptr));
+    RETURN_IF_ERROR(new_writer->open(_state, _profile));
+    COUNTER_UPDATE(_partition_writer_created, 1);
+    DorisMetrics::instance()->paimon_partition_writer_created->increment(1);
+    _writers.emplace(key, new_writer);
+    *writer = std::move(new_writer);
+    return Status::OK();
+}
+
+Status VPaimonTableWriter::write(RuntimeState* state, ::doris::Block& block) {
+#ifndef WITH_PAIMON_CPP
+    return Status::NotSupported("paimon-cpp is not enabled");
+#else
+    if (block.rows() == 0) {
+        return Status::OK();
+    }
+    SCOPED_TIMER(_send_data_timer);
+    int64_t send_data_ns = 0;
+    auto to_ms_ceil = [](int64_t ns) -> uint64_t {
+        if (ns <= 0) {
+            return 0;
+        }
+        return static_cast<uint64_t>((ns + 999999) / 1000000);
+    };
+    Defer record_send_data_latency {[&]() {
+        
DorisMetrics::instance()->paimon_write_send_data_latency_ms->add(to_ms_ceil(send_data_ns));
+    }};
+    SCOPED_RAW_TIMER(&send_data_ns);
+
+    Block output_block;
+    int64_t project_ns = 0;
+    {
+        SCOPED_TIMER(_project_timer);
+        SCOPED_RAW_TIMER(&project_ns);
+        RETURN_IF_ERROR(_projection_block(block, &output_block));
+    }
+    _row_count += output_block.rows();

Review Comment:
   The JNI writer updates `RuntimeState` load counters after accepting rows, 
but this native paimon-cpp path only updates the local profile counters and 
metrics. FE derives INSERT OK rows, audit return rows, and `loadedRows` from BE 
load counters, so inserts with `enable_paimon_jni_writer=false` can succeed 
while reporting 0 loaded rows. Please update `update_num_rows_load_total()` and 
`update_num_bytes_load_total()` on this path as well after the projected block 
is accepted.



##########
fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniWriter.java:
##########
@@ -0,0 +1,709 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.paimon;
+
+import 
org.apache.doris.common.security.authentication.PreExecutionAuthenticator;
+import 
org.apache.doris.common.security.authentication.PreExecutionAuthenticatorCache;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.BitVector;
+import org.apache.arrow.vector.DateDayVector;
+import org.apache.arrow.vector.DecimalVector;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.Float4Vector;
+import org.apache.arrow.vector.Float8Vector;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.SmallIntVector;
+import org.apache.arrow.vector.TimeStampMicroVector;
+import org.apache.arrow.vector.TinyIntVector;
+import org.apache.arrow.vector.VarBinaryVector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.holders.NullableTimeStampMicroHolder;
+import org.apache.arrow.vector.ipc.ArrowStreamReader;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.GenericArray;
+import org.apache.paimon.data.GenericMap;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.disk.IOManagerImpl;
+import org.apache.paimon.io.DataOutputSerializer;
+import org.apache.paimon.memory.HeapMemorySegmentPool;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.CommitMessageSerializer;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.BinaryType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.MapType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.VarBinaryType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.InputStream;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.time.LocalDateTime;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class PaimonJniWriter {
+    private static final Logger LOG = 
LoggerFactory.getLogger(PaimonJniWriter.class);
+
+    private BatchTableWrite writer;
+    private final BufferAllocator allocator;
+    private final CommitMessageSerializer serializer = new 
CommitMessageSerializer();
+    private String tableLocation;
+    private static final String PAIMON_OPTION_PREFIX = "paimon.";
+    private static final String HADOOP_OPTION_PREFIX = "hadoop.";
+
+    private PreExecutionAuthenticator preExecutionAuthenticator;
+    private final ClassLoader classLoader;
+
+    private Map<String, String> paimonOptionParams;
+    private Map<String, String> hadoopOptionParams;
+
+    private List<DataField> paimonFields;
+    private Map<String, DataField> paimonFieldMap;
+    private DataType[] targetTypes;
+
+    private IOManager ioManager;
+    private HeapMemorySegmentPool memorySegmentPool;
+    private boolean isDynamicBucketMode;
+
+    public PaimonJniWriter() {
+        this.allocator = new RootAllocator(Long.MAX_VALUE);
+        this.classLoader = this.getClass().getClassLoader();
+        try {
+            setWarnLevel("org.apache.paimon.shade.org.apache.parquet");
+            setWarnLevel("org.apache.paimon");
+        } catch (Throwable t) {
+            LOG.warn(t.getMessage(), t);
+        }
+    }
+
+    private void setWarnLevel(String loggerName) {
+        org.slf4j.Logger targetLogger = 
org.slf4j.LoggerFactory.getLogger(loggerName);
+        try {
+            Class<?> logbackLoggerClass = 
Class.forName("ch.qos.logback.classic.Logger");
+            Class<?> levelClass = 
Class.forName("ch.qos.logback.classic.Level");
+            if (!logbackLoggerClass.isInstance(targetLogger)) {
+                return;
+            }
+            Object warnLevel = levelClass.getField("WARN").get(null);
+            logbackLoggerClass.getMethod("setLevel", 
levelClass).invoke(targetLogger, warnLevel);
+        } catch (Throwable t) {
+            LOG.debug("set logger level skipped for {}", loggerName, t);
+        }
+    }
+
+    /**
+     * Opens the table and initializes the Writer
+     */
+    public void open(String tableLocation, Map<String, String> options, 
String[] columnNames) throws Exception {
+        this.tableLocation = tableLocation;
+        Thread.currentThread().setContextClassLoader(classLoader);
+        paimonOptionParams = options.entrySet().stream()
+                .filter(kv -> kv.getKey().startsWith(PAIMON_OPTION_PREFIX))
+                .collect(Collectors
+                        .toMap(kv1 -> 
kv1.getKey().substring(PAIMON_OPTION_PREFIX.length()),
+                                kv1 -> kv1.getValue()));
+        hadoopOptionParams = options.entrySet().stream()
+                .filter(kv -> kv.getKey().startsWith(HADOOP_OPTION_PREFIX))
+                .collect(Collectors
+                        .toMap(kv1 -> 
kv1.getKey().substring(HADOOP_OPTION_PREFIX.length()),
+                                kv1 -> kv1.getValue()));
+        if (!paimonOptionParams.containsKey("warehouse") && tableLocation != 
null) {
+            paimonOptionParams.put("warehouse", tableLocation);
+        }
+        this.preExecutionAuthenticator = 
PreExecutionAuthenticatorCache.getAuthenticator(options);
+        preExecutionAuthenticator.execute(() -> {
+            try {
+                LOG.info("paimon: opening writer, location={}, opts_size={}", 
tableLocation, options.size());
+                Table table;
+                if (options.containsKey("serialized_table")) {
+                    table = 
PaimonUtils.deserialize(options.get("serialized_table"));
+                } else {
+                    Catalog catalog = createCatalog(paimonOptionParams, 
hadoopOptionParams);
+                    String dbName = options.getOrDefault("db_name", "default");
+                    String tblName = options.getOrDefault("table_name", 
"paimon_table");
+                    table = catalog.getTable(Identifier.create(dbName, 
tblName));
+                }
+                Map<String, String> dynamicOptions = new HashMap<>();
+                if (options.containsKey("write-buffer-size")) {
+                    dynamicOptions.put("write-buffer-size", 
options.get("write-buffer-size"));
+                }
+                copyIfPresent(options, dynamicOptions, 
"write-buffer-spillable");
+                copyIfPresent(options, dynamicOptions, 
"write-buffer-spill.max-disk-size");
+                copyIfPresent(options, dynamicOptions, 
"sort-spill-buffer-size");
+                copyIfPresent(options, dynamicOptions, "sort-spill-threshold");
+                copyIfPresent(options, dynamicOptions, "spill-compression");
+                boolean enableJniCompact = Boolean.parseBoolean(
+                        options.getOrDefault("paimon_use_jni_compact", 
"false"));
+                boolean enableInlineCompact = enableJniCompact;
+                if (enableJniCompact && table instanceof 
org.apache.paimon.table.FileStoreTable) {
+                    int tableBuckets = 
((org.apache.paimon.table.FileStoreTable) table).schema().numBuckets();
+                    enableInlineCompact = tableBuckets > 0;
+                }
+                if (enableInlineCompact) {
+                    LOG.info("paimon: enabling inline compaction for JNI 
writer");
+                    dynamicOptions.put("write-only", "false");
+                    dynamicOptions.put("num-sorted-run.compaction-trigger", 
"10");
+                    dynamicOptions.put("compaction.max.worker-num", "8");
+                } else {
+                    LOG.info("paimon: write-only mode, compaction disabled");
+                    dynamicOptions.put("write-only", "true");
+                }
+                table = table.copy(dynamicOptions);
+                LOG.info("paimon: applied dynamic options to table: {}", 
dynamicOptions);
+                this.paimonFields = table.rowType().getFields();
+                this.paimonFieldMap = new HashMap<>();
+                for (DataField f : this.paimonFields) {
+                    this.paimonFieldMap.put(f.name(), f);
+                }
+                this.targetTypes = buildTargetTypes(columnNames);
+                this.writer = table.newBatchWriteBuilder().newWrite();
+                if (table instanceof org.apache.paimon.table.FileStoreTable) {
+                    this.isDynamicBucketMode =
+                            ((org.apache.paimon.table.FileStoreTable) 
table).schema().numBuckets() == -1;
+                }
+                boolean spillEnabled = 
Boolean.parseBoolean(options.getOrDefault("write-buffer-spillable", "false"));
+                if (spillEnabled) {
+                    String spillDir = options.get("paimon_jni_spill_dir");
+                    if (spillDir != null && !spillDir.isEmpty()) {
+                        File spillFile = new File(spillDir);
+                        if (!spillFile.exists()) {
+                            spillFile.mkdirs();
+                        }
+                        this.ioManager = new IOManagerImpl(spillDir);
+                    } else {
+                        this.ioManager = new 
IOManagerImpl(System.getProperty("java.io.tmpdir"));
+                    }
+                    long globalPoolSize = Long.parseLong(options.getOrDefault(
+                            "paimon_global_memory_pool_size", "1073741824"));
+                    this.memorySegmentPool = new 
HeapMemorySegmentPool(globalPoolSize, 32 * 1024);
+                    
this.writer.withIOManager(ioManager).withMemoryPool(memorySegmentPool);
+                    LOG.info("paimon: spill enabled, spill_dir={}", spillDir);
+                }
+                return null;
+            } catch (Throwable t) {
+                throw contextException("open", "options_size=" + 
options.size(), t);
+            }
+        });
+    }
+
+    private void copyIfPresent(Map<String, String> options, Map<String, 
String> dynamicOptions, String key) {
+        String value = options.get(key);
+        if (value != null) {
+            dynamicOptions.put(key, value);
+        }
+    }
+
+    /**
+     * Receives Arrow IPC memory address from C++, deserializes and writes to 
Paimon
+     */
+    public void write(long address, int length) throws Exception {
+        preExecutionAuthenticator.execute(() -> {
+            try {
+                ByteBuffer directBuffer = getDirectBuffer(address, length);
+                try (ArrowStreamReader reader = new ArrowStreamReader(
+                        new DirectBufInputStream(directBuffer), allocator)) {
+                    VectorSchemaRoot root = reader.getVectorSchemaRoot();
+                    while (reader.loadNextBatch()) {
+                        writeBatch(root);
+                    }
+                }
+                return null;
+            } catch (Throwable t) {
+                throw contextException("write", "address=" + address + ", 
length=" + length, t);
+            }
+        });
+    }
+
+    private void writeBatch(VectorSchemaRoot root) throws Exception {
+        int rowCount = root.getRowCount();
+        if (rowCount == 0) {
+            return;
+        }
+        List<Field> fields = root.getSchema().getFields();
+        List<FieldVector> vectors = root.getFieldVectors();
+        int colCount = fields.size();
+        DataType[] currentTargetTypes = resolveTargetTypes(fields);
+        GenericRow reusedRow = new GenericRow(colCount);
+        for (int i = 0; i < rowCount; i++) {
+            for (int col = 0; col < colCount; col++) {
+                Field arrowField = fields.get(col);
+                try {
+                    reusedRow.setField(col, readArrowValue(vectors.get(col), 
i, arrowField, currentTargetTypes[col]));
+                } catch (Throwable t) {
+                    String fieldName = arrowField == null ? "null" : 
arrowField.getName();
+                    String currentTargetType = currentTargetTypes[col] == null
+                            ? "null"
+                            : currentTargetTypes[col].asSQLString();
+                    throw contextException("writeBatch.convert",
+                            "row=" + i + ", col=" + col + ", field=" + 
fieldName + ", targetType="
+                                    + currentTargetType + ", rowCount=" + 
rowCount + ", colCount=" + colCount,
+                            t);
+                }
+            }
+            try {
+                if (isDynamicBucketMode) {
+                    writer.write(reusedRow, 0);
+                } else {
+                    writer.write(reusedRow);
+                }
+            } catch (Throwable t) {
+                throw contextException("writeBatch.write",
+                        "row=" + i + ", rowCount=" + rowCount + ", colCount=" 
+ colCount,
+                        t);
+            }
+        }
+    }
+
+    private Object readArrowValue(FieldVector vector, int row, Field 
arrowField, DataType targetType) {
+        if (vector == null || vector.isNull(row)) {
+            return null;
+        }
+        if (targetType instanceof BinaryType || targetType instanceof 
VarBinaryType) {
+            if (vector instanceof VarBinaryVector) {
+                return ((VarBinaryVector) vector).get(row);
+            }
+            if (vector instanceof VarCharVector) {
+                return ((VarCharVector) vector).get(row);
+            }
+        } else {
+            if (vector instanceof VarCharVector) {
+                return BinaryString.fromBytes(((VarCharVector) 
vector).get(row));
+            }
+        }
+        if (vector instanceof BitVector) {
+            return ((BitVector) vector).get(row) == 1;
+        }
+        if (vector instanceof TinyIntVector) {
+            return ((TinyIntVector) vector).get(row);
+        }
+        if (vector instanceof SmallIntVector) {
+            return ((SmallIntVector) vector).get(row);
+        }
+        if (vector instanceof IntVector) {
+            return ((IntVector) vector).get(row);
+        }
+        if (vector instanceof BigIntVector) {
+            return ((BigIntVector) vector).get(row);
+        }
+        if (vector instanceof Float4Vector) {
+            return ((Float4Vector) vector).get(row);
+        }
+        if (vector instanceof Float8Vector) {
+            return ((Float8Vector) vector).get(row);
+        }
+        if (vector instanceof DateDayVector) {
+            return ((DateDayVector) vector).get(row);
+        }
+        if (vector instanceof TimeStampMicroVector) {
+            NullableTimeStampMicroHolder holder = new 
NullableTimeStampMicroHolder();
+            ((TimeStampMicroVector) vector).get(row, holder);
+            return Timestamp.fromMicros(holder.value);
+        }
+        if (vector instanceof DecimalVector) {
+            return readDecimal((DecimalVector) vector, row);
+        }
+        Object val = vector.getObject(row);
+        return convertToPaimonType(val, arrowField, targetType);
+    }
+
+    private Decimal readDecimal(DecimalVector vector, int row) {
+        BigDecimal bd = getBigDecimalFromArrowBuf(vector.getDataBuffer(), row,
+                vector.getScale(), DecimalVector.TYPE_WIDTH);
+        return Decimal.fromBigDecimal(bd, vector.getPrecision(), 
vector.getScale());
+    }
+
+    private Object convertToPaimonType(Object val, Field arrowField, DataType 
targetType) {
+        if (val == null) {
+            return null;
+        }
+
+        if (targetType instanceof BinaryType || targetType instanceof 
VarBinaryType) {
+            if (val instanceof byte[]) {
+                return val;
+            } else if (val instanceof BinaryString) {
+                return ((BinaryString) val).toBytes();
+            } else if (val instanceof org.apache.arrow.vector.util.Text) {
+                return ((org.apache.arrow.vector.util.Text) val).copyBytes();
+            } else if (val instanceof org.apache.hadoop.io.Text) {
+                org.apache.hadoop.io.Text t = (org.apache.hadoop.io.Text) val;
+                byte[] bytes = new byte[t.getLength()];
+                System.arraycopy(t.getBytes(), 0, bytes, 0, t.getLength());
+                return bytes;
+            } else if (val instanceof String) {
+                return ((String) 
val).getBytes(java.nio.charset.StandardCharsets.UTF_8);
+            } else {
+                return 
val.toString().getBytes(java.nio.charset.StandardCharsets.UTF_8);
+            }
+        }
+
+        if (val instanceof BinaryString) {
+            return val;
+        }
+        if (val instanceof byte[]) {
+            return BinaryString.fromBytes((byte[]) val);
+        }
+        if (val instanceof org.apache.arrow.vector.util.Text) {
+            return BinaryString.fromBytes(((org.apache.arrow.vector.util.Text) 
val).copyBytes());
+        }
+        if (val instanceof org.apache.hadoop.io.Text) {
+            org.apache.hadoop.io.Text t = (org.apache.hadoop.io.Text) val;
+            return BinaryString.fromBytes(t.getBytes(), 0, t.getLength());
+        }
+        if (val instanceof CharSequence) {
+            return BinaryString.fromString(val.toString());
+        }
+
+
+        ArrowType arrowType = arrowField != null ? arrowField.getType() : null;
+        ArrowType.ArrowTypeID typeID = arrowType != null ? 
arrowType.getTypeID() : null;
+
+        if (val instanceof LocalDateTime) {
+            return Timestamp.fromLocalDateTime((LocalDateTime) val);
+        }
+        if (val instanceof Long && typeID == ArrowType.ArrowTypeID.Timestamp) {
+            return Timestamp.fromMicros((Long) val);
+        }
+        if (val instanceof java.time.LocalDate) {
+            return (int) ((java.time.LocalDate) val).toEpochDay();
+        }
+        if (val instanceof Integer && typeID == ArrowType.ArrowTypeID.Date) {
+            return val;
+        }
+        if (val instanceof BigDecimal) {
+            BigDecimal bd = (BigDecimal) val;
+            return Decimal.fromBigDecimal(bd, bd.precision(), bd.scale());
+        }
+        if (targetType instanceof RowType && val instanceof Map) {
+            RowType rowType = (RowType) targetType;
+            Map<?, ?> mapVal = (Map<?, ?>) val;
+            List<DataField> childFields = rowType.getFields();
+            GenericRow structRow = new GenericRow(childFields.size());
+
+            for (int i = 0; i < childFields.size(); i++) {
+                DataField childField = childFields.get(i);
+                Object childVal = mapVal.get(childField.name());
+
+                Field childArrowField = null;
+                if (arrowField != null && arrowField.getChildren() != null) {
+                    for (Field f : arrowField.getChildren()) {
+                        if (f.getName().equals(childField.name())) {
+                            childArrowField = f;
+                            break;
+                        }
+                    }
+                }
+                structRow.setField(i, convertToPaimonType(childVal, 
childArrowField, childField.type()));
+            }
+            return structRow;
+        }
+        if (targetType instanceof MapType && val instanceof List) {
+            MapType mapType = (MapType) targetType;
+            List<?> list = (List<?>) val;
+            Map<Object, Object> convertedMap = new HashMap<>();
+
+            Field keyArrowField = null;
+            Field valueArrowField = null;
+            if (arrowField != null && !arrowField.getChildren().isEmpty()) {
+                Field entriesStruct = arrowField.getChildren().get(0);
+                if (entriesStruct.getChildren().size() >= 2) {
+                    keyArrowField = entriesStruct.getChildren().get(0);
+                    valueArrowField = entriesStruct.getChildren().get(1);
+                }
+            }
+
+            String keyName = keyArrowField != null ? keyArrowField.getName() : 
"key";
+            String valueName = valueArrowField != null ? 
valueArrowField.getName() : "value";
+
+            for (Object element : list) {
+                if (element instanceof Map) { // Arrow 的内部 Struct 会被转成 Java Map
+                    Map<?, ?> kvStruct = (Map<?, ?>) element;
+                    Object k = convertToPaimonType(kvStruct.get(keyName), 
keyArrowField, mapType.getKeyType());
+                    Object v = convertToPaimonType(kvStruct.get(valueName), 
valueArrowField, mapType.getValueType());
+                    convertedMap.put(k, v);
+                }
+            }
+            return new GenericMap(convertedMap);
+        }
+        if (targetType instanceof ArrayType && val instanceof List) {
+            ArrayType arrayType = (ArrayType) targetType;
+            List<?> list = (List<?>) val;
+            Object[] convertedArray = new Object[list.size()];
+            Field childArrowField = (arrowField != null && 
!arrowField.getChildren().isEmpty())
+                    ? arrowField.getChildren().get(0) : null;
+
+            for (int i = 0; i < list.size(); i++) {
+                convertedArray[i] = convertToPaimonType(list.get(i), 
childArrowField, arrayType.getElementType());
+            }
+            return new GenericArray(convertedArray);
+        }
+
+        if (val instanceof byte[]) {
+            return val;
+        }
+
+        return val;
+    }
+
+    public byte[][] prepareCommit() throws Exception {
+        if (writer == null) {
+            return new byte[0][];
+        }
+        return preExecutionAuthenticator.execute(() -> {
+            try {
+                List<CommitMessage> messages = writer.prepareCommit();
+
+                if (messages == null || messages.isEmpty()) {
+                    LOG.info("paimon: prepareCommit returns empty, 
location={}", tableLocation);
+                    return new byte[0][];
+                }
+                LOG.info("paimon: prepareCommit returns {} messages", 
messages.size());
+                final int maxPayloadBytes = 8 * 1024 * 1024;
+                int chunkSize = 512;
+                java.util.ArrayList<byte[]> payloads = new 
java.util.ArrayList<>();
+                int i = 0;
+                while (i < messages.size()) {
+                    int end = Math.min(i + chunkSize, messages.size());
+                    DataOutputSerializer outputView = new 
DataOutputSerializer(1024);
+                    serializer.serializeList(messages.subList(i, end), 
outputView);
+                    byte[] data = outputView.getCopyOfBuffer();
+                    int len = data.length;
+                    int version = serializer.getVersion();
+                    byte[] payload = new byte[12 + len];
+                    payload[0] = 'D';
+                    payload[1] = 'P';
+                    payload[2] = 'C';
+                    payload[3] = 'M';
+                    payload[4] = (byte) ((version >>> 24) & 0xFF);
+                    payload[5] = (byte) ((version >>> 16) & 0xFF);
+                    payload[6] = (byte) ((version >>> 8) & 0xFF);
+                    payload[7] = (byte) (version & 0xFF);
+                    payload[8] = (byte) ((len >>> 24) & 0xFF);
+                    payload[9] = (byte) ((len >>> 16) & 0xFF);
+                    payload[10] = (byte) ((len >>> 8) & 0xFF);
+                    payload[11] = (byte) (len & 0xFF);
+                    System.arraycopy(data, 0, payload, 12, len);
+                    if (payload.length > maxPayloadBytes && chunkSize > 1) {
+                        chunkSize = Math.max(1, chunkSize / 2);
+                        continue;
+                    }
+                    payloads.add(payload);
+                    i = end;
+                }
+                return payloads.toArray(new byte[0][]);
+            } catch (Throwable t) {
+                throw contextException("prepareCommit", "tableLocation=" + 
tableLocation, t);
+            }
+        });
+    }
+
+    public void abort() {
+        try {
+            if (preExecutionAuthenticator != null) {
+                preExecutionAuthenticator.execute(() -> {
+                    closeWriterResources(false);
+                    return null;
+                });
+            }
+        } catch (Exception e) {
+            LOG.error("Failed to abort paimon writer", e);
+        }
+    }
+
+    public void close() {
+        try {
+            if (preExecutionAuthenticator != null) {
+                preExecutionAuthenticator.execute(() -> {
+                    closeWriterResources(true);
+                    return null;
+                });
+            }
+        } catch (Exception e) {
+            LOG.warn("Error while closing PaimonJniWriter", e);

Review Comment:
   This swallows failures from `closeWriterResources(true)`, so the JNI caller 
treats Java close as successful even if `BatchTableWrite.close()` fails. At 
this point BE may already have collected commit payloads, and FE can still 
commit the Paimon transaction with messages from a writer that did not close 
cleanly. Please propagate at least `writer.close()` failures back through 
JNI/C++ so the insert fails instead of committing a potentially incomplete 
write.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to