github-actions[bot] commented on code in PR #32306:
URL: https://github.com/apache/doris/pull/32306#discussion_r1527455212


##########
be/src/vec/sink/writer/vhive_partition_writer.h:
##########
@@ -0,0 +1,99 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <gen_cpp/PlanNodes_types.h>

Review Comment:
   warning: 'gen_cpp/PlanNodes_types.h' file not found [clang-diagnostic-error]
   ```cpp
   #include <gen_cpp/PlanNodes_types.h>
            ^
   ```
   



##########
be/src/vec/sink/writer/vhive_partition_writer.h:
##########
@@ -0,0 +1,99 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <gen_cpp/PlanNodes_types.h>
+
+#include "io/fs/file_writer.h"
+#include "vec/columns/column.h"
+#include "vec/exprs/vexpr_fwd.h"
+#include "vec/runtime/vfile_format_transformer.h"
+
+namespace doris {
+
+class ObjectPool;
+class RuntimeState;
+class RuntimeProfile;
+
+namespace vectorized {
+
+class Block;
+class VFileFormatTransformer;
+
+class VHivePartitionWriter {
+public:
+    struct WriteInfo {
+        std::string write_path;
+        std::string target_path;
+        TFileType::type file_type;
+    };
+
+    VHivePartitionWriter(const TDataSink& t_sink, const std::string 
partition_name,
+                         TUpdateMode::type update_mode, const 
VExprContextSPtrs& output_expr_ctxs,
+                         const std::vector<THiveColumn>& columns, WriteInfo 
write_info,
+                         const std::string file_name, TFileFormatType::type 
file_format_type,

Review Comment:
   warning: parameter 'file_name' is const-qualified in the function 
declaration; const-qualification of parameters only has an effect in function 
definitions [readability-avoid-const-params-in-decls]
   
   ```suggestion
                            std::string file_name, TFileFormatType::type 
file_format_type,
   ```
   



##########
be/src/vec/sink/writer/vhive_partition_writer.h:
##########
@@ -0,0 +1,99 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <gen_cpp/PlanNodes_types.h>
+
+#include "io/fs/file_writer.h"
+#include "vec/columns/column.h"
+#include "vec/exprs/vexpr_fwd.h"
+#include "vec/runtime/vfile_format_transformer.h"
+
+namespace doris {
+
+class ObjectPool;
+class RuntimeState;
+class RuntimeProfile;
+
+namespace vectorized {
+
+class Block;
+class VFileFormatTransformer;
+
+class VHivePartitionWriter {
+public:
+    struct WriteInfo {
+        std::string write_path;
+        std::string target_path;
+        TFileType::type file_type;
+    };
+
+    VHivePartitionWriter(const TDataSink& t_sink, const std::string 
partition_name,

Review Comment:
   warning: parameter 'partition_name' is const-qualified in the function 
declaration; const-qualification of parameters only has an effect in function 
definitions [readability-avoid-const-params-in-decls]
   
   ```suggestion
       VHivePartitionWriter(const TDataSink& t_sink, std::string partition_name,
   ```
   



##########
be/src/vec/sink/writer/vhive_partition_writer.h:
##########
@@ -0,0 +1,99 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <gen_cpp/PlanNodes_types.h>
+
+#include "io/fs/file_writer.h"
+#include "vec/columns/column.h"
+#include "vec/exprs/vexpr_fwd.h"
+#include "vec/runtime/vfile_format_transformer.h"
+
+namespace doris {
+
+class ObjectPool;
+class RuntimeState;
+class RuntimeProfile;
+
+namespace vectorized {
+
+class Block;
+class VFileFormatTransformer;
+
+class VHivePartitionWriter {
+public:
+    struct WriteInfo {
+        std::string write_path;
+        std::string target_path;
+        TFileType::type file_type;
+    };
+
+    VHivePartitionWriter(const TDataSink& t_sink, const std::string 
partition_name,
+                         TUpdateMode::type update_mode, const 
VExprContextSPtrs& output_expr_ctxs,
+                         const std::vector<THiveColumn>& columns, WriteInfo 
write_info,
+                         const std::string file_name, TFileFormatType::type 
file_format_type,
+                         TFileCompressType::type hive_compress_type,
+                         const std::map<std::string, std::string>& 
hadoop_conf);

Review Comment:
   warning: parameter 10 is const-qualified in the function declaration; 
const-qualification of parameters only has an effect in function definitions 
[readability-avoid-const-params-in-decls]
   
   ```suggestion
                            std::map<std::string, std::string>& hadoop_conf);
   ```
   



##########
be/src/vec/sink/writer/vhive_table_writer.cpp:
##########
@@ -0,0 +1,432 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vhive_table_writer.h"
+
+#include "runtime/runtime_state.h"
+#include "vec/core/block.h"
+#include "vec/core/column_with_type_and_name.h"
+#include "vec/exprs/vexpr.h"
+#include "vec/exprs/vexpr_context.h"
+#include "vec/sink/writer/vhive_partition_writer.h"
+#include "vec/sink/writer/vhive_utils.h"
+
+namespace doris {
+namespace vectorized {

Review Comment:
   warning: nested namespaces can be concatenated 
[modernize-concat-nested-namespaces]
   
   ```suggestion
   namespace doris::vectorized {
   ```
   
   be/src/vec/sink/writer/vhive_table_writer.cpp:430:
   ```diff
   - } // namespace vectorized
   - } // namespace doris
   + } // namespace doris
   ```
   



##########
be/src/vec/sink/writer/vhive_table_writer.cpp:
##########
@@ -0,0 +1,432 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vhive_table_writer.h"
+
+#include "runtime/runtime_state.h"
+#include "vec/core/block.h"
+#include "vec/core/column_with_type_and_name.h"
+#include "vec/exprs/vexpr.h"
+#include "vec/exprs/vexpr_context.h"
+#include "vec/sink/writer/vhive_partition_writer.h"
+#include "vec/sink/writer/vhive_utils.h"
+
+namespace doris {
+namespace vectorized {
+
+VHiveTableWriter::VHiveTableWriter(const TDataSink& t_sink,
+                                   const VExprContextSPtrs& output_expr_ctxs)
+        : AsyncResultWriter(output_expr_ctxs), _t_sink(t_sink) {
+    DCHECK(_t_sink.__isset.hive_table_sink);
+}
+
+Status VHiveTableWriter::init_properties(ObjectPool* pool) {
+    return Status::OK();
+}
+
+Status VHiveTableWriter::open(RuntimeState* state, RuntimeProfile* profile) {
+    _state = state;
+    _profile = profile;
+
+    for (int i = 0; i < _t_sink.hive_table_sink.columns.size(); ++i) {
+        if (_t_sink.hive_table_sink.columns[i].column_type == 
THiveColumnType::PARTITION_KEY) {
+            _partition_columns_input_index.emplace_back(i);
+        }
+    }
+    return Status::OK();
+}
+
+Status VHiveTableWriter::write(vectorized::Block& block) {
+    std::unordered_map<std::shared_ptr<VHivePartitionWriter>, IColumn::Filter> 
writer_positions;
+
+    auto& hive_table_sink = _t_sink.hive_table_sink;
+
+    if (_partition_columns_input_index.empty()) {
+        auto writer_iter = _partitions_to_writers.find("");
+        if (writer_iter == _partitions_to_writers.end()) {
+            try {
+                std::shared_ptr<VHivePartitionWriter> writer = 
_create_partition_writer(block, -1);
+                _partitions_to_writers.insert({"", writer});
+                RETURN_IF_ERROR(writer->open(_state, _profile));
+                RETURN_IF_ERROR(writer->write(block));
+            } catch (doris::Exception& e) {
+                return e.to_status();
+            }
+            return Status::OK();
+        } else {
+            std::shared_ptr<VHivePartitionWriter> writer;
+            if (writer_iter->second->written_len() > 
config::hive_sink_max_file_size) {
+                static_cast<void>(writer_iter->second->close(Status::OK()));
+                _partitions_to_writers.erase(writer_iter);
+                try {
+                    writer = _create_partition_writer(block, -1);
+                    _partitions_to_writers.insert({"", writer});
+                    RETURN_IF_ERROR(writer->open(_state, _profile));
+                    RETURN_IF_ERROR(writer->write(block));
+                } catch (doris::Exception& e) {
+                    return e.to_status();
+                }
+            } else {
+                writer = writer_iter->second;
+            }
+            RETURN_IF_ERROR(writer->write(block));
+            return Status::OK();
+        }
+    }
+
+    for (int i = 0; i < block.rows(); ++i) {
+        std::vector<std::string> partition_values;
+        try {
+            partition_values = _create_partition_values(block, i);
+        } catch (doris::Exception& e) {
+            return e.to_status();
+        }
+        std::string partition_name = VHiveUtils::make_partition_name(
+                hive_table_sink.columns, _partition_columns_input_index, 
partition_values);
+
+        auto create_and_open_writer =
+                [&](const std::string& partition_name, int position,
+                    std::shared_ptr<VHivePartitionWriter>& writer_ptr) -> 
Status {
+            try {
+                auto writer = _create_partition_writer(block, position);
+                RETURN_IF_ERROR(writer->open(_state, _profile));
+                IColumn::Filter filter(block.rows(), 0);
+                filter[position] = 1;
+                writer_positions.insert({writer, std::move(filter)});
+                _partitions_to_writers.insert({partition_name, writer});
+                writer_ptr = writer;
+            } catch (doris::Exception& e) {
+                return e.to_status();
+            }
+            return Status::OK();
+        };
+
+        auto writer_iter = _partitions_to_writers.find(partition_name);
+        if (writer_iter == _partitions_to_writers.end()) {
+            std::shared_ptr<VHivePartitionWriter> writer;
+            RETURN_IF_ERROR(create_and_open_writer(partition_name, i, writer));
+        } else {
+            std::shared_ptr<VHivePartitionWriter> writer;
+            if (writer_iter->second->written_len() > 
config::hive_sink_max_file_size) {
+                static_cast<void>(writer_iter->second->close(Status::OK()));
+                writer_positions.erase(writer_iter->second);
+                _partitions_to_writers.erase(writer_iter);
+                RETURN_IF_ERROR(create_and_open_writer(partition_name, i, 
writer));
+            } else {
+                writer = writer_iter->second;
+            }
+            auto writer_pos_iter = writer_positions.find(writer);
+            if (writer_pos_iter == writer_positions.end()) {
+                IColumn::Filter filter(block.rows(), 0);
+                filter[i] = 1;
+                writer_positions.insert({writer, std::move(filter)});
+            } else {
+                writer_pos_iter->second[i] = 1;
+            }
+        }
+    }
+
+    for (auto it = writer_positions.begin(); it != writer_positions.end(); 
++it) {
+        RETURN_IF_ERROR(it->first->write(block, &it->second));
+    }
+    return Status::OK();
+}
+
+Status VHiveTableWriter::close(Status status) {
+    for (const auto& pair : _partitions_to_writers) {
+        Status st = pair.second->close(status);
+        if (st != Status::OK()) {
+            LOG(WARNING) << fmt::format("Unsupported type for partition {}", 
st.to_string());
+            continue;
+        }
+    }
+    _partitions_to_writers.clear();
+    return Status::OK();
+}
+
+std::shared_ptr<VHivePartitionWriter> 
VHiveTableWriter::_create_partition_writer(

Review Comment:
   warning: method '_create_partition_writer' can be made static 
[readability-convert-member-functions-to-static]
   
   be/src/vec/sink/writer/vhive_table_writer.h:52:
   ```diff
   -     std::shared_ptr<VHivePartitionWriter> 
_create_partition_writer(vectorized::Block& block,
   +     static std::shared_ptr<VHivePartitionWriter> 
_create_partition_writer(vectorized::Block& block,
   ```
   



##########
be/src/vec/sink/writer/vhive_table_writer.cpp:
##########
@@ -0,0 +1,432 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vhive_table_writer.h"
+
+#include "runtime/runtime_state.h"
+#include "vec/core/block.h"
+#include "vec/core/column_with_type_and_name.h"
+#include "vec/exprs/vexpr.h"
+#include "vec/exprs/vexpr_context.h"
+#include "vec/sink/writer/vhive_partition_writer.h"
+#include "vec/sink/writer/vhive_utils.h"
+
+namespace doris {
+namespace vectorized {
+
+VHiveTableWriter::VHiveTableWriter(const TDataSink& t_sink,
+                                   const VExprContextSPtrs& output_expr_ctxs)
+        : AsyncResultWriter(output_expr_ctxs), _t_sink(t_sink) {
+    DCHECK(_t_sink.__isset.hive_table_sink);
+}
+
+Status VHiveTableWriter::init_properties(ObjectPool* pool) {

Review Comment:
   warning: method 'init_properties' can be made static 
[readability-convert-member-functions-to-static]
   
   be/src/vec/sink/writer/vhive_table_writer.h:43:
   ```diff
   -     Status init_properties(ObjectPool* pool);
   +     static Status init_properties(ObjectPool* pool);
   ```
   



##########
be/test/vec/exec/skewed_partition_rebalancer_test.cpp:
##########
@@ -0,0 +1,318 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is porting from
+// 
https://github.com/trinodb/trino/blob/master/core/trino-main/src/test/java/io/trino/operator/output/TestSkewedPartitionRebalancer.java
+// to cpp and modified by Doris
+
+#include "vec/exec/skewed_partition_rebalancer.h"
+
+#include <gtest/gtest.h>

Review Comment:
   warning: 'gtest/gtest.h' file not found [clang-diagnostic-error]
   ```cpp
   #include <gtest/gtest.h>
            ^
   ```
   



##########
be/src/vec/sink/writer/vhive_utils.cpp:
##########
@@ -0,0 +1,78 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vhive_utils.h"
+
+#include <algorithm>
+#include <regex>
+#include <sstream>
+
+namespace doris {
+namespace vectorized {

Review Comment:
   warning: nested namespaces can be concatenated 
[modernize-concat-nested-namespaces]
   
   ```suggestion
   namespace doris::vectorized {
   ```
   
   be/src/vec/sink/writer/vhive_utils.cpp:76:
   ```diff
   - } // namespace vectorized
   - } // namespace doris
   + } // namespace doris
   ```
   



##########
be/src/vec/sink/writer/vhive_table_writer.cpp:
##########
@@ -0,0 +1,432 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vhive_table_writer.h"
+
+#include "runtime/runtime_state.h"
+#include "vec/core/block.h"
+#include "vec/core/column_with_type_and_name.h"
+#include "vec/exprs/vexpr.h"
+#include "vec/exprs/vexpr_context.h"
+#include "vec/sink/writer/vhive_partition_writer.h"
+#include "vec/sink/writer/vhive_utils.h"
+
+namespace doris {
+namespace vectorized {
+
+VHiveTableWriter::VHiveTableWriter(const TDataSink& t_sink,
+                                   const VExprContextSPtrs& output_expr_ctxs)
+        : AsyncResultWriter(output_expr_ctxs), _t_sink(t_sink) {
+    DCHECK(_t_sink.__isset.hive_table_sink);
+}
+
+Status VHiveTableWriter::init_properties(ObjectPool* pool) {
+    return Status::OK();
+}
+
+Status VHiveTableWriter::open(RuntimeState* state, RuntimeProfile* profile) {
+    _state = state;
+    _profile = profile;
+
+    for (int i = 0; i < _t_sink.hive_table_sink.columns.size(); ++i) {
+        if (_t_sink.hive_table_sink.columns[i].column_type == 
THiveColumnType::PARTITION_KEY) {
+            _partition_columns_input_index.emplace_back(i);
+        }
+    }
+    return Status::OK();
+}
+
+Status VHiveTableWriter::write(vectorized::Block& block) {
+    std::unordered_map<std::shared_ptr<VHivePartitionWriter>, IColumn::Filter> 
writer_positions;
+
+    auto& hive_table_sink = _t_sink.hive_table_sink;
+
+    if (_partition_columns_input_index.empty()) {
+        auto writer_iter = _partitions_to_writers.find("");
+        if (writer_iter == _partitions_to_writers.end()) {
+            try {
+                std::shared_ptr<VHivePartitionWriter> writer = 
_create_partition_writer(block, -1);
+                _partitions_to_writers.insert({"", writer});
+                RETURN_IF_ERROR(writer->open(_state, _profile));
+                RETURN_IF_ERROR(writer->write(block));
+            } catch (doris::Exception& e) {
+                return e.to_status();
+            }
+            return Status::OK();
+        } else {
+            std::shared_ptr<VHivePartitionWriter> writer;
+            if (writer_iter->second->written_len() > 
config::hive_sink_max_file_size) {
+                static_cast<void>(writer_iter->second->close(Status::OK()));
+                _partitions_to_writers.erase(writer_iter);
+                try {
+                    writer = _create_partition_writer(block, -1);
+                    _partitions_to_writers.insert({"", writer});
+                    RETURN_IF_ERROR(writer->open(_state, _profile));
+                    RETURN_IF_ERROR(writer->write(block));
+                } catch (doris::Exception& e) {
+                    return e.to_status();
+                }
+            } else {
+                writer = writer_iter->second;
+            }
+            RETURN_IF_ERROR(writer->write(block));
+            return Status::OK();
+        }
+    }
+
+    for (int i = 0; i < block.rows(); ++i) {
+        std::vector<std::string> partition_values;
+        try {
+            partition_values = _create_partition_values(block, i);
+        } catch (doris::Exception& e) {
+            return e.to_status();
+        }
+        std::string partition_name = VHiveUtils::make_partition_name(
+                hive_table_sink.columns, _partition_columns_input_index, 
partition_values);
+
+        auto create_and_open_writer =
+                [&](const std::string& partition_name, int position,
+                    std::shared_ptr<VHivePartitionWriter>& writer_ptr) -> 
Status {
+            try {
+                auto writer = _create_partition_writer(block, position);
+                RETURN_IF_ERROR(writer->open(_state, _profile));
+                IColumn::Filter filter(block.rows(), 0);
+                filter[position] = 1;
+                writer_positions.insert({writer, std::move(filter)});
+                _partitions_to_writers.insert({partition_name, writer});
+                writer_ptr = writer;
+            } catch (doris::Exception& e) {
+                return e.to_status();
+            }
+            return Status::OK();
+        };
+
+        auto writer_iter = _partitions_to_writers.find(partition_name);
+        if (writer_iter == _partitions_to_writers.end()) {
+            std::shared_ptr<VHivePartitionWriter> writer;
+            RETURN_IF_ERROR(create_and_open_writer(partition_name, i, writer));
+        } else {
+            std::shared_ptr<VHivePartitionWriter> writer;
+            if (writer_iter->second->written_len() > 
config::hive_sink_max_file_size) {
+                static_cast<void>(writer_iter->second->close(Status::OK()));
+                writer_positions.erase(writer_iter->second);
+                _partitions_to_writers.erase(writer_iter);
+                RETURN_IF_ERROR(create_and_open_writer(partition_name, i, 
writer));
+            } else {
+                writer = writer_iter->second;
+            }
+            auto writer_pos_iter = writer_positions.find(writer);
+            if (writer_pos_iter == writer_positions.end()) {
+                IColumn::Filter filter(block.rows(), 0);
+                filter[i] = 1;
+                writer_positions.insert({writer, std::move(filter)});
+            } else {
+                writer_pos_iter->second[i] = 1;
+            }
+        }
+    }
+
+    for (auto it = writer_positions.begin(); it != writer_positions.end(); 
++it) {
+        RETURN_IF_ERROR(it->first->write(block, &it->second));
+    }
+    return Status::OK();
+}
+
+Status VHiveTableWriter::close(Status status) {
+    for (const auto& pair : _partitions_to_writers) {
+        Status st = pair.second->close(status);
+        if (st != Status::OK()) {
+            LOG(WARNING) << fmt::format("Unsupported type for partition {}", 
st.to_string());
+            continue;
+        }
+    }
+    _partitions_to_writers.clear();
+    return Status::OK();
+}
+
+std::shared_ptr<VHivePartitionWriter> 
VHiveTableWriter::_create_partition_writer(
+        vectorized::Block& block, int position) {
+    auto& hive_table_sink = _t_sink.hive_table_sink;
+    std::vector<std::string> partition_values;
+    std::string partition_name;
+    if (!_partition_columns_input_index.empty()) {
+        partition_values = _create_partition_values(block, position);
+        partition_name = VHiveUtils::make_partition_name(
+                hive_table_sink.columns, _partition_columns_input_index, 
partition_values);
+    }
+    const std::vector<THivePartition>& partitions = hive_table_sink.partitions;
+    const THiveLocationParams& write_location = hive_table_sink.location;
+    const THivePartition* existing_partition = nullptr;
+    bool existing_table = true;
+    for (const auto& partition : partitions) {
+        if (partition_values == partition.values) {
+            existing_partition = &partition;
+            break;
+        }
+    }
+    TUpdateMode::type update_mode;
+    VHivePartitionWriter::WriteInfo write_info;
+    TFileFormatType::type file_format_type;
+    TFileCompressType::type write_compress_type;
+    if (existing_partition == nullptr) { // new partition
+        if (existing_table == false) {   // new table
+            update_mode = TUpdateMode::NEW;
+            if (_partition_columns_input_index.empty()) { // new unpartitioned 
table
+                write_info = {write_location.write_path, 
write_location.target_path,
+                              write_location.file_type};
+            } else { // a new partition in a new partitioned table
+                auto write_path = fmt::format("{}/{}", 
write_location.write_path, partition_name);
+                auto target_path = fmt::format("{}/{}", 
write_location.target_path, partition_name);
+                write_info = {std::move(write_path), std::move(target_path),
+                              write_location.file_type};
+            }
+        } else { // a new partition in an existing partitioned table, or an 
existing unpartitioned table
+            if (_partition_columns_input_index.empty()) { // an existing 
unpartitioned table
+                update_mode =
+                        !hive_table_sink.overwrite ? TUpdateMode::APPEND : 
TUpdateMode::OVERWRITE;
+                write_info = {write_location.write_path, 
write_location.target_path,
+                              write_location.file_type};
+            } else { // a new partition in an existing partitioned table
+                update_mode = TUpdateMode::NEW;
+                auto write_path = fmt::format("{}/{}", 
write_location.write_path, partition_name);
+                auto target_path = fmt::format("{}/{}", 
write_location.target_path, partition_name);
+                write_info = {std::move(write_path), std::move(target_path),
+                              write_location.file_type};
+            }
+            // need to get schema from existing table ?
+        }
+        file_format_type = hive_table_sink.file_format;
+        write_compress_type = hive_table_sink.compression_type;
+    } else { // existing partition
+        if (!hive_table_sink.overwrite) {
+            update_mode = TUpdateMode::APPEND;
+            auto write_path = fmt::format("{}/{}", write_location.write_path, 
partition_name);
+            auto target_path = fmt::format("{}", 
existing_partition->location.target_path);
+            write_info = {std::move(write_path), std::move(target_path),
+                          existing_partition->location.file_type};
+            file_format_type = existing_partition->file_format;
+            write_compress_type = hive_table_sink.compression_type;
+        } else {
+            update_mode = TUpdateMode::OVERWRITE;
+            auto write_path = fmt::format("{}/{}", write_location.write_path, 
partition_name);
+            auto target_path = fmt::format("{}/{}", 
write_location.target_path, partition_name);
+            write_info = {std::move(write_path), std::move(target_path), 
write_location.file_type};
+            file_format_type = hive_table_sink.file_format;
+            write_compress_type = hive_table_sink.compression_type;
+            // need to get schema from existing table ?
+        }
+    }
+
+    return std::make_shared<VHivePartitionWriter>(
+            _t_sink, std::move(partition_name), update_mode, 
_vec_output_expr_ctxs,
+            hive_table_sink.columns, std::move(write_info),
+            fmt::format("{}{}", _compute_file_name(),
+                        _get_file_extension(file_format_type, 
write_compress_type)),
+            file_format_type, write_compress_type, 
hive_table_sink.hadoop_config);
+}
+
+std::vector<std::string> 
VHiveTableWriter::_create_partition_values(vectorized::Block& block,
+                                                                    int 
position) {
+    std::vector<std::string> partition_values;
+    for (int i = 0; i < _partition_columns_input_index.size(); ++i) {
+        int partition_column_idx = _partition_columns_input_index[i];
+        vectorized::ColumnWithTypeAndName partition_column =
+                block.get_by_position(partition_column_idx);
+        std::string value =
+                
_to_partition_value(_vec_output_expr_ctxs[partition_column_idx]->root()->type(),
+                                    partition_column, position);
+
+        // Check if value contains only printable ASCII characters
+        bool isValid = true;
+        for (char c : value) {
+            if (c < 0x20 || c > 0x7E) {
+                isValid = false;
+                break;
+            }
+        }
+
+        if (!isValid) {
+            // Encode value using Base16 encoding with space separator
+            std::stringstream encoded;
+            for (unsigned char c : value) {
+                encoded << std::hex << std::setw(2) << std::setfill('0') << 
(int)c;
+                encoded << " ";
+            }
+            throw doris::Exception(
+                    doris::ErrorCode::INTERNAL_ERROR,
+                    "Hive partition values can only contain printable ASCII 
characters (0x20 - "
+                    "0x7E). Invalid value: {}",
+                    encoded.str());
+        }
+
+        partition_values.emplace_back(value);
+    }
+
+    return partition_values;
+}
+
+std::string VHiveTableWriter::_to_partition_value(const TypeDescriptor& 
type_desc,

Review Comment:
   warning: function '_to_partition_value' exceeds recommended size/complexity 
thresholds [readability-function-size]
   ```cpp
   std::string VHiveTableWriter::_to_partition_value(const TypeDescriptor& 
type_desc,
                                 ^
   ```
   <details>
   <summary>Additional context</summary>
   
   **be/src/vec/sink/writer/vhive_table_writer.cpp:281:** 97 lines including 
whitespace and comments (threshold 80)
   ```cpp
   std::string VHiveTableWriter::_to_partition_value(const TypeDescriptor& 
type_desc,
                                 ^
   ```
   
   </details>
   



##########
be/src/vec/sink/writer/vhive_table_writer.cpp:
##########
@@ -0,0 +1,432 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vhive_table_writer.h"
+
+#include "runtime/runtime_state.h"
+#include "vec/core/block.h"
+#include "vec/core/column_with_type_and_name.h"
+#include "vec/exprs/vexpr.h"
+#include "vec/exprs/vexpr_context.h"
+#include "vec/sink/writer/vhive_partition_writer.h"
+#include "vec/sink/writer/vhive_utils.h"
+
+namespace doris {
+namespace vectorized {
+
+VHiveTableWriter::VHiveTableWriter(const TDataSink& t_sink,
+                                   const VExprContextSPtrs& output_expr_ctxs)
+        : AsyncResultWriter(output_expr_ctxs), _t_sink(t_sink) {
+    DCHECK(_t_sink.__isset.hive_table_sink);
+}
+
+Status VHiveTableWriter::init_properties(ObjectPool* pool) {
+    return Status::OK();
+}
+
+Status VHiveTableWriter::open(RuntimeState* state, RuntimeProfile* profile) {
+    _state = state;
+    _profile = profile;
+
+    for (int i = 0; i < _t_sink.hive_table_sink.columns.size(); ++i) {
+        if (_t_sink.hive_table_sink.columns[i].column_type == 
THiveColumnType::PARTITION_KEY) {
+            _partition_columns_input_index.emplace_back(i);
+        }
+    }
+    return Status::OK();
+}
+
+Status VHiveTableWriter::write(vectorized::Block& block) {

Review Comment:
   warning: function 'write' has cognitive complexity of 94 (threshold 50) 
[readability-function-cognitive-complexity]
   ```cpp
   Status VHiveTableWriter::write(vectorized::Block& block) {
                            ^
   ```
   <details>
   <summary>Additional context</summary>
   
   **be/src/vec/sink/writer/vhive_table_writer.cpp:57:** +1, including nesting 
penalty of 0, nesting level increased to 1
   ```cpp
       if (_partition_columns_input_index.empty()) {
       ^
   ```
   **be/src/vec/sink/writer/vhive_table_writer.cpp:59:** +2, including nesting 
penalty of 1, nesting level increased to 2
   ```cpp
           if (writer_iter == _partitions_to_writers.end()) {
           ^
   ```
   **be/src/vec/sink/writer/vhive_table_writer.cpp:63:** +3, including nesting 
penalty of 2, nesting level increased to 3
   ```cpp
                   RETURN_IF_ERROR(writer->open(_state, _profile));
                   ^
   ```
   **be/src/common/status.h:541:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/vec/sink/writer/vhive_table_writer.cpp:63:** +4, including nesting 
penalty of 3, nesting level increased to 4
   ```cpp
                   RETURN_IF_ERROR(writer->open(_state, _profile));
                   ^
   ```
   **be/src/common/status.h:543:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/vec/sink/writer/vhive_table_writer.cpp:64:** +3, including nesting 
penalty of 2, nesting level increased to 3
   ```cpp
                   RETURN_IF_ERROR(writer->write(block));
                   ^
   ```
   **be/src/common/status.h:541:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/vec/sink/writer/vhive_table_writer.cpp:64:** +4, including nesting 
penalty of 3, nesting level increased to 4
   ```cpp
                   RETURN_IF_ERROR(writer->write(block));
                   ^
   ```
   **be/src/common/status.h:543:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/vec/sink/writer/vhive_table_writer.cpp:65:** +3, including nesting 
penalty of 2, nesting level increased to 3
   ```cpp
               } catch (doris::Exception& e) {
                 ^
   ```
   **be/src/vec/sink/writer/vhive_table_writer.cpp:69:** +1, nesting level 
increased to 2
   ```cpp
           } else {
             ^
   ```
   **be/src/vec/sink/writer/vhive_table_writer.cpp:71:** +3, including nesting 
penalty of 2, nesting level increased to 3
   ```cpp
               if (writer_iter->second->written_len() > 
config::hive_sink_max_file_size) {
               ^
   ```
   **be/src/vec/sink/writer/vhive_table_writer.cpp:77:** +4, including nesting 
penalty of 3, nesting level increased to 4
   ```cpp
                       RETURN_IF_ERROR(writer->open(_state, _profile));
                       ^
   ```
   **be/src/common/status.h:541:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/vec/sink/writer/vhive_table_writer.cpp:77:** +5, including nesting 
penalty of 4, nesting level increased to 5
   ```cpp
                       RETURN_IF_ERROR(writer->open(_state, _profile));
                       ^
   ```
   **be/src/common/status.h:543:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/vec/sink/writer/vhive_table_writer.cpp:78:** +4, including nesting 
penalty of 3, nesting level increased to 4
   ```cpp
                       RETURN_IF_ERROR(writer->write(block));
                       ^
   ```
   **be/src/common/status.h:541:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/vec/sink/writer/vhive_table_writer.cpp:78:** +5, including nesting 
penalty of 4, nesting level increased to 5
   ```cpp
                       RETURN_IF_ERROR(writer->write(block));
                       ^
   ```
   **be/src/common/status.h:543:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/vec/sink/writer/vhive_table_writer.cpp:79:** +4, including nesting 
penalty of 3, nesting level increased to 4
   ```cpp
                   } catch (doris::Exception& e) {
                     ^
   ```
   **be/src/vec/sink/writer/vhive_table_writer.cpp:82:** +1, nesting level 
increased to 3
   ```cpp
               } else {
                 ^
   ```
   **be/src/vec/sink/writer/vhive_table_writer.cpp:85:** +3, including nesting 
penalty of 2, nesting level increased to 3
   ```cpp
               RETURN_IF_ERROR(writer->write(block));
               ^
   ```
   **be/src/common/status.h:541:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/vec/sink/writer/vhive_table_writer.cpp:85:** +4, including nesting 
penalty of 3, nesting level increased to 4
   ```cpp
               RETURN_IF_ERROR(writer->write(block));
               ^
   ```
   **be/src/common/status.h:543:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/vec/sink/writer/vhive_table_writer.cpp:90:** +1, including nesting 
penalty of 0, nesting level increased to 1
   ```cpp
       for (int i = 0; i < block.rows(); ++i) {
       ^
   ```
   **be/src/vec/sink/writer/vhive_table_writer.cpp:94:** +2, including nesting 
penalty of 1, nesting level increased to 2
   ```cpp
           } catch (doris::Exception& e) {
             ^
   ```
   **be/src/vec/sink/writer/vhive_table_writer.cpp:101:** nesting level 
increased to 2
   ```cpp
                   [&](const std::string& partition_name, int position,
                   ^
   ```
   **be/src/vec/sink/writer/vhive_table_writer.cpp:105:** +3, including nesting 
penalty of 2, nesting level increased to 3
   ```cpp
                   RETURN_IF_ERROR(writer->open(_state, _profile));
                   ^
   ```
   **be/src/common/status.h:541:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/vec/sink/writer/vhive_table_writer.cpp:105:** +4, including nesting 
penalty of 3, nesting level increased to 4
   ```cpp
                   RETURN_IF_ERROR(writer->open(_state, _profile));
                   ^
   ```
   **be/src/common/status.h:543:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/vec/sink/writer/vhive_table_writer.cpp:111:** +3, including nesting 
penalty of 2, nesting level increased to 3
   ```cpp
               } catch (doris::Exception& e) {
                 ^
   ```
   **be/src/vec/sink/writer/vhive_table_writer.cpp:118:** +2, including nesting 
penalty of 1, nesting level increased to 2
   ```cpp
           if (writer_iter == _partitions_to_writers.end()) {
           ^
   ```
   **be/src/vec/sink/writer/vhive_table_writer.cpp:120:** +3, including nesting 
penalty of 2, nesting level increased to 3
   ```cpp
               RETURN_IF_ERROR(create_and_open_writer(partition_name, i, 
writer));
               ^
   ```
   **be/src/common/status.h:541:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/vec/sink/writer/vhive_table_writer.cpp:120:** +4, including nesting 
penalty of 3, nesting level increased to 4
   ```cpp
               RETURN_IF_ERROR(create_and_open_writer(partition_name, i, 
writer));
               ^
   ```
   **be/src/common/status.h:543:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/vec/sink/writer/vhive_table_writer.cpp:121:** +1, nesting level 
increased to 2
   ```cpp
           } else {
             ^
   ```
   **be/src/vec/sink/writer/vhive_table_writer.cpp:123:** +3, including nesting 
penalty of 2, nesting level increased to 3
   ```cpp
               if (writer_iter->second->written_len() > 
config::hive_sink_max_file_size) {
               ^
   ```
   **be/src/vec/sink/writer/vhive_table_writer.cpp:127:** +4, including nesting 
penalty of 3, nesting level increased to 4
   ```cpp
                   RETURN_IF_ERROR(create_and_open_writer(partition_name, i, 
writer));
                   ^
   ```
   **be/src/common/status.h:541:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/vec/sink/writer/vhive_table_writer.cpp:127:** +5, including nesting 
penalty of 4, nesting level increased to 5
   ```cpp
                   RETURN_IF_ERROR(create_and_open_writer(partition_name, i, 
writer));
                   ^
   ```
   **be/src/common/status.h:543:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/vec/sink/writer/vhive_table_writer.cpp:128:** +1, nesting level 
increased to 3
   ```cpp
               } else {
                 ^
   ```
   **be/src/vec/sink/writer/vhive_table_writer.cpp:132:** +3, including nesting 
penalty of 2, nesting level increased to 3
   ```cpp
               if (writer_pos_iter == writer_positions.end()) {
               ^
   ```
   **be/src/vec/sink/writer/vhive_table_writer.cpp:136:** +1, nesting level 
increased to 3
   ```cpp
               } else {
                 ^
   ```
   
   </details>
   



##########
be/src/vec/sink/writer/vhive_utils.h:
##########
@@ -0,0 +1,45 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <gen_cpp/DataSinks_types.h>

Review Comment:
   warning: 'gen_cpp/DataSinks_types.h' file not found [clang-diagnostic-error]
   ```cpp
   #include <gen_cpp/DataSinks_types.h>
            ^
   ```
   



##########
be/src/vec/sink/writer/vhive_table_writer.h:
##########
@@ -0,0 +1,74 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <gen_cpp/DataSinks_types.h>
+
+#include "vec/exprs/vexpr_fwd.h"
+#include "vec/sink/writer/async_result_writer.h"
+
+namespace doris {
+
+class ObjectPool;
+class RuntimeState;
+class RuntimeProfile;
+struct TypeDescriptor;
+
+namespace vectorized {
+
+class Block;
+class VHivePartitionWriter;
+struct ColumnWithTypeAndName;
+
+class VHiveTableWriter final : public AsyncResultWriter {
+public:
+    VHiveTableWriter(const TDataSink& t_sink, const VExprContextSPtrs& 
output_exprs);
+
+    ~VHiveTableWriter() = default;

Review Comment:
   warning: annotate this function with 'override' or (rarely) 'final' 
[modernize-use-override]
   
   ```suggestion
       ~VHiveTableWriter() override = default;
   ```
   



##########
be/src/vec/sink/writer/vhive_partition_writer.h:
##########
@@ -0,0 +1,99 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <gen_cpp/PlanNodes_types.h>
+
+#include "io/fs/file_writer.h"
+#include "vec/columns/column.h"
+#include "vec/exprs/vexpr_fwd.h"
+#include "vec/runtime/vfile_format_transformer.h"
+
+namespace doris {
+
+class ObjectPool;
+class RuntimeState;
+class RuntimeProfile;
+
+namespace vectorized {
+
+class Block;
+class VFileFormatTransformer;
+
+class VHivePartitionWriter {
+public:
+    struct WriteInfo {
+        std::string write_path;
+        std::string target_path;
+        TFileType::type file_type;
+    };
+
+    VHivePartitionWriter(const TDataSink& t_sink, const std::string 
partition_name,
+                         TUpdateMode::type update_mode, const 
VExprContextSPtrs& output_expr_ctxs,
+                         const std::vector<THiveColumn>& columns, WriteInfo 
write_info,
+                         const std::string file_name, TFileFormatType::type 
file_format_type,
+                         TFileCompressType::type hive_compress_type,
+                         const std::map<std::string, std::string>& 
hadoop_conf);
+
+    Status init_properties(ObjectPool* pool) { return Status::OK(); }

Review Comment:
   warning: method 'init_properties' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
       static Status init_properties(ObjectPool* pool) { return Status::OK(); }
   ```
   



##########
be/src/vec/sink/writer/vhive_table_writer.h:
##########
@@ -0,0 +1,74 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <gen_cpp/DataSinks_types.h>

Review Comment:
   warning: 'gen_cpp/DataSinks_types.h' file not found [clang-diagnostic-error]
   ```cpp
   #include <gen_cpp/DataSinks_types.h>
            ^
   ```
   



##########
be/src/vec/sink/writer/vhive_table_writer.cpp:
##########
@@ -0,0 +1,432 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vhive_table_writer.h"
+
+#include "runtime/runtime_state.h"
+#include "vec/core/block.h"
+#include "vec/core/column_with_type_and_name.h"
+#include "vec/exprs/vexpr.h"
+#include "vec/exprs/vexpr_context.h"
+#include "vec/sink/writer/vhive_partition_writer.h"
+#include "vec/sink/writer/vhive_utils.h"
+
+namespace doris {
+namespace vectorized {
+
+VHiveTableWriter::VHiveTableWriter(const TDataSink& t_sink,
+                                   const VExprContextSPtrs& output_expr_ctxs)
+        : AsyncResultWriter(output_expr_ctxs), _t_sink(t_sink) {
+    DCHECK(_t_sink.__isset.hive_table_sink);
+}
+
+Status VHiveTableWriter::init_properties(ObjectPool* pool) {
+    return Status::OK();
+}
+
+Status VHiveTableWriter::open(RuntimeState* state, RuntimeProfile* profile) {
+    _state = state;
+    _profile = profile;
+
+    for (int i = 0; i < _t_sink.hive_table_sink.columns.size(); ++i) {
+        if (_t_sink.hive_table_sink.columns[i].column_type == 
THiveColumnType::PARTITION_KEY) {
+            _partition_columns_input_index.emplace_back(i);
+        }
+    }
+    return Status::OK();
+}
+
+Status VHiveTableWriter::write(vectorized::Block& block) {
+    std::unordered_map<std::shared_ptr<VHivePartitionWriter>, IColumn::Filter> 
writer_positions;
+
+    auto& hive_table_sink = _t_sink.hive_table_sink;
+
+    if (_partition_columns_input_index.empty()) {
+        auto writer_iter = _partitions_to_writers.find("");
+        if (writer_iter == _partitions_to_writers.end()) {
+            try {
+                std::shared_ptr<VHivePartitionWriter> writer = 
_create_partition_writer(block, -1);
+                _partitions_to_writers.insert({"", writer});
+                RETURN_IF_ERROR(writer->open(_state, _profile));
+                RETURN_IF_ERROR(writer->write(block));
+            } catch (doris::Exception& e) {
+                return e.to_status();
+            }
+            return Status::OK();
+        } else {
+            std::shared_ptr<VHivePartitionWriter> writer;
+            if (writer_iter->second->written_len() > 
config::hive_sink_max_file_size) {
+                static_cast<void>(writer_iter->second->close(Status::OK()));
+                _partitions_to_writers.erase(writer_iter);
+                try {
+                    writer = _create_partition_writer(block, -1);
+                    _partitions_to_writers.insert({"", writer});
+                    RETURN_IF_ERROR(writer->open(_state, _profile));
+                    RETURN_IF_ERROR(writer->write(block));
+                } catch (doris::Exception& e) {
+                    return e.to_status();
+                }
+            } else {
+                writer = writer_iter->second;
+            }
+            RETURN_IF_ERROR(writer->write(block));
+            return Status::OK();
+        }
+    }
+
+    for (int i = 0; i < block.rows(); ++i) {
+        std::vector<std::string> partition_values;
+        try {
+            partition_values = _create_partition_values(block, i);
+        } catch (doris::Exception& e) {
+            return e.to_status();
+        }
+        std::string partition_name = VHiveUtils::make_partition_name(
+                hive_table_sink.columns, _partition_columns_input_index, 
partition_values);
+
+        auto create_and_open_writer =
+                [&](const std::string& partition_name, int position,
+                    std::shared_ptr<VHivePartitionWriter>& writer_ptr) -> 
Status {
+            try {
+                auto writer = _create_partition_writer(block, position);
+                RETURN_IF_ERROR(writer->open(_state, _profile));
+                IColumn::Filter filter(block.rows(), 0);
+                filter[position] = 1;
+                writer_positions.insert({writer, std::move(filter)});
+                _partitions_to_writers.insert({partition_name, writer});
+                writer_ptr = writer;
+            } catch (doris::Exception& e) {
+                return e.to_status();
+            }
+            return Status::OK();
+        };
+
+        auto writer_iter = _partitions_to_writers.find(partition_name);
+        if (writer_iter == _partitions_to_writers.end()) {
+            std::shared_ptr<VHivePartitionWriter> writer;
+            RETURN_IF_ERROR(create_and_open_writer(partition_name, i, writer));
+        } else {
+            std::shared_ptr<VHivePartitionWriter> writer;
+            if (writer_iter->second->written_len() > 
config::hive_sink_max_file_size) {
+                static_cast<void>(writer_iter->second->close(Status::OK()));
+                writer_positions.erase(writer_iter->second);
+                _partitions_to_writers.erase(writer_iter);
+                RETURN_IF_ERROR(create_and_open_writer(partition_name, i, 
writer));
+            } else {
+                writer = writer_iter->second;
+            }
+            auto writer_pos_iter = writer_positions.find(writer);
+            if (writer_pos_iter == writer_positions.end()) {
+                IColumn::Filter filter(block.rows(), 0);
+                filter[i] = 1;
+                writer_positions.insert({writer, std::move(filter)});
+            } else {
+                writer_pos_iter->second[i] = 1;
+            }
+        }
+    }
+
+    for (auto it = writer_positions.begin(); it != writer_positions.end(); 
++it) {
+        RETURN_IF_ERROR(it->first->write(block, &it->second));
+    }
+    return Status::OK();
+}
+
+Status VHiveTableWriter::close(Status status) {
+    for (const auto& pair : _partitions_to_writers) {
+        Status st = pair.second->close(status);
+        if (st != Status::OK()) {
+            LOG(WARNING) << fmt::format("Unsupported type for partition {}", 
st.to_string());
+            continue;
+        }
+    }
+    _partitions_to_writers.clear();
+    return Status::OK();
+}
+
+std::shared_ptr<VHivePartitionWriter> 
VHiveTableWriter::_create_partition_writer(
+        vectorized::Block& block, int position) {
+    auto& hive_table_sink = _t_sink.hive_table_sink;
+    std::vector<std::string> partition_values;
+    std::string partition_name;
+    if (!_partition_columns_input_index.empty()) {
+        partition_values = _create_partition_values(block, position);
+        partition_name = VHiveUtils::make_partition_name(
+                hive_table_sink.columns, _partition_columns_input_index, 
partition_values);
+    }
+    const std::vector<THivePartition>& partitions = hive_table_sink.partitions;
+    const THiveLocationParams& write_location = hive_table_sink.location;
+    const THivePartition* existing_partition = nullptr;
+    bool existing_table = true;
+    for (const auto& partition : partitions) {
+        if (partition_values == partition.values) {
+            existing_partition = &partition;
+            break;
+        }
+    }
+    TUpdateMode::type update_mode;
+    VHivePartitionWriter::WriteInfo write_info;
+    TFileFormatType::type file_format_type;
+    TFileCompressType::type write_compress_type;
+    if (existing_partition == nullptr) { // new partition
+        if (existing_table == false) {   // new table

Review Comment:
   warning: redundant boolean literal supplied to boolean operator 
[readability-simplify-boolean-expr]
   
   ```suggestion
           if (!existing_table) {   // new table
   ```
   



##########
be/src/vec/sink/writer/vhive_table_writer.cpp:
##########
@@ -0,0 +1,432 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vhive_table_writer.h"
+
+#include "runtime/runtime_state.h"
+#include "vec/core/block.h"
+#include "vec/core/column_with_type_and_name.h"
+#include "vec/exprs/vexpr.h"
+#include "vec/exprs/vexpr_context.h"
+#include "vec/sink/writer/vhive_partition_writer.h"
+#include "vec/sink/writer/vhive_utils.h"
+
+namespace doris {
+namespace vectorized {
+
+VHiveTableWriter::VHiveTableWriter(const TDataSink& t_sink,
+                                   const VExprContextSPtrs& output_expr_ctxs)
+        : AsyncResultWriter(output_expr_ctxs), _t_sink(t_sink) {
+    DCHECK(_t_sink.__isset.hive_table_sink);
+}
+
+Status VHiveTableWriter::init_properties(ObjectPool* pool) {
+    return Status::OK();
+}
+
+Status VHiveTableWriter::open(RuntimeState* state, RuntimeProfile* profile) {
+    _state = state;
+    _profile = profile;
+
+    for (int i = 0; i < _t_sink.hive_table_sink.columns.size(); ++i) {
+        if (_t_sink.hive_table_sink.columns[i].column_type == 
THiveColumnType::PARTITION_KEY) {
+            _partition_columns_input_index.emplace_back(i);
+        }
+    }
+    return Status::OK();
+}
+
+Status VHiveTableWriter::write(vectorized::Block& block) {

Review Comment:
   warning: function 'write' exceeds recommended size/complexity thresholds 
[readability-function-size]
   ```cpp
   Status VHiveTableWriter::write(vectorized::Block& block) {
                            ^
   ```
   <details>
   <summary>Additional context</summary>
   
   **be/src/vec/sink/writer/vhive_table_writer.cpp:52:** 94 lines including 
whitespace and comments (threshold 80)
   ```cpp
   Status VHiveTableWriter::write(vectorized::Block& block) {
                            ^
   ```
   
   </details>
   



##########
be/test/util/indexed_priority_queue_test.cpp:
##########
@@ -0,0 +1,104 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "util/indexed_priority_queue.hpp"

Review Comment:
   warning: 'util/indexed_priority_queue.hpp' file not found 
[clang-diagnostic-error]
   ```cpp
   #include "util/indexed_priority_queue.hpp"
            ^
   ```
   



##########
be/test/vec/exec/skewed_partition_rebalancer_test.cpp:
##########
@@ -0,0 +1,318 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is porting from
+// 
https://github.com/trinodb/trino/blob/master/core/trino-main/src/test/java/io/trino/operator/output/TestSkewedPartitionRebalancer.java
+// to cpp and modified by Doris
+
+#include "vec/exec/skewed_partition_rebalancer.h"
+
+#include <gtest/gtest.h>
+
+#include <list>
+
+namespace doris::vectorized {
+
+class SkewedPartitionRebalancerTest : public testing::Test {
+public:
+    SkewedPartitionRebalancerTest() = default;
+    virtual ~SkewedPartitionRebalancerTest() = default;
+
+private:
+    std::vector<std::list<int>> _get_partition_positions(
+            std::unique_ptr<SkewedPartitionRebalancer>& rebalancer,
+            std::vector<long>& partition_row_count, int partition_count, int 
max_position) {
+        std::vector<std::list<int>> 
partitionPositions(rebalancer->get_task_count());
+
+        for (int partition = 0; partition < rebalancer->get_task_count(); 
partition++) {
+            partitionPositions[partition] = std::list<int>();
+        }
+
+        for (int position = 0; position < max_position; position++) {
+            int partition = position % partition_count;
+            partition = rebalancer->get_task_id(partition, 
partition_row_count[partition]++);
+            partitionPositions[partition].push_back(position);
+        }
+
+        return partitionPositions;
+    }
+
+    bool _vectors_equal(const std::vector<std::list<int>>& vec1,

Review Comment:
   warning: method '_vectors_equal' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
       static bool _vectors_equal(const std::vector<std::list<int>>& vec1,
   ```
   



##########
be/src/vec/sink/writer/vhive_utils.h:
##########
@@ -0,0 +1,45 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <gen_cpp/DataSinks_types.h>
+
+#include <algorithm>
+#include <iostream>
+#include <regex>
+#include <sstream>
+#include <string>
+#include <vector>
+
+namespace doris {
+namespace vectorized {

Review Comment:
   warning: nested namespaces can be concatenated 
[modernize-concat-nested-namespaces]
   
   ```suggestion
   namespace doris::vectorized {
   ```
   
   be/src/vec/sink/writer/vhive_utils.h:43:
   ```diff
   - } // namespace vectorized
   - } // namespace doris
   + } // namespace doris
   ```
   



##########
be/test/vec/exec/skewed_partition_rebalancer_test.cpp:
##########
@@ -0,0 +1,318 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is porting from
+// 
https://github.com/trinodb/trino/blob/master/core/trino-main/src/test/java/io/trino/operator/output/TestSkewedPartitionRebalancer.java
+// to cpp and modified by Doris
+
+#include "vec/exec/skewed_partition_rebalancer.h"
+
+#include <gtest/gtest.h>
+
+#include <list>
+
+namespace doris::vectorized {
+
+class SkewedPartitionRebalancerTest : public testing::Test {
+public:
+    SkewedPartitionRebalancerTest() = default;
+    virtual ~SkewedPartitionRebalancerTest() = default;
+
+private:
+    std::vector<std::list<int>> _get_partition_positions(
+            std::unique_ptr<SkewedPartitionRebalancer>& rebalancer,
+            std::vector<long>& partition_row_count, int partition_count, int 
max_position) {
+        std::vector<std::list<int>> 
partitionPositions(rebalancer->get_task_count());
+
+        for (int partition = 0; partition < rebalancer->get_task_count(); 
partition++) {
+            partitionPositions[partition] = std::list<int>();
+        }
+
+        for (int position = 0; position < max_position; position++) {
+            int partition = position % partition_count;
+            partition = rebalancer->get_task_id(partition, 
partition_row_count[partition]++);
+            partitionPositions[partition].push_back(position);
+        }
+
+        return partitionPositions;
+    }
+
+    bool _vectors_equal(const std::vector<std::list<int>>& vec1,
+                        const std::vector<std::list<int>>& vec2) {
+        if (vec1.size() != vec2.size()) {
+            return false;
+        }
+        for (size_t i = 0; i < vec1.size(); i++) {
+            if (vec1[i] != vec2[i]) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    bool _compare_vector_of_lists(const std::vector<std::list<int>>& expected,

Review Comment:
   warning: method '_compare_vector_of_lists' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
       static bool _compare_vector_of_lists(const std::vector<std::list<int>>& 
expected,
   ```
   



##########
be/src/vec/sink/writer/vhive_partition_writer.cpp:
##########
@@ -0,0 +1,282 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vhive_partition_writer.h"
+
+#include "io/file_factory.h"
+#include "io/fs/file_system.h"
+#include "runtime/runtime_state.h"
+#include "vec/core/materialize_block.h"
+#include "vec/runtime/vorc_transformer.h"
+#include "vec/runtime/vparquet_transformer.h"
+
+namespace doris {
+namespace vectorized {
+
+VHivePartitionWriter::VHivePartitionWriter(
+        const TDataSink& t_sink, const std::string partition_name, 
TUpdateMode::type update_mode,
+        const VExprContextSPtrs& output_expr_ctxs, const 
std::vector<THiveColumn>& columns,
+        WriteInfo write_info, const std::string file_name, 
TFileFormatType::type file_format_type,
+        TFileCompressType::type hive_compress_type,
+        const std::map<std::string, std::string>& hadoop_conf)
+        : _partition_name(std::move(partition_name)),
+          _update_mode(update_mode),
+          _vec_output_expr_ctxs(output_expr_ctxs),
+          _columns(columns),
+          _write_info(std::move(write_info)),
+          _file_name(std::move(file_name)),
+          _file_format_type(file_format_type),
+          _hive_compress_type(hive_compress_type),
+          _hadoop_conf(hadoop_conf)
+
+{}
+
+Status VHivePartitionWriter::open(RuntimeState* state, RuntimeProfile* 
profile) {
+    _state = state;
+
+    std::vector<TNetworkAddress> broker_addresses;
+    RETURN_IF_ERROR(FileFactory::create_file_writer(
+            _write_info.file_type, state->exec_env(), broker_addresses, 
_hadoop_conf,
+            fmt::format("{}/{}", _write_info.write_path, _file_name), 0, 
_file_writer_impl));
+
+    switch (_file_format_type) {
+    case TFileFormatType::FORMAT_PARQUET: {
+        bool parquet_disable_dictionary = false;
+        TParquetCompressionType::type parquet_compression_type;
+        switch (_hive_compress_type) {
+        case TFileCompressType::PLAIN: {
+            parquet_compression_type = TParquetCompressionType::UNCOMPRESSED;
+            break;
+        }
+        case TFileCompressType::SNAPPYBLOCK: {
+            parquet_compression_type = TParquetCompressionType::SNAPPY;
+            break;
+        }
+        case TFileCompressType::ZSTD: {
+            parquet_compression_type = TParquetCompressionType::ZSTD;
+            break;
+        }
+        default: {
+            return Status::InternalError("Unsupported hive compress type {} 
with parquet",
+                                         to_string(_hive_compress_type));
+        }
+        }
+        std::vector<TParquetSchema> parquet_schemas;
+        for (int i = 0; i < _columns.size(); i++) {
+            VExprSPtr column_expr = _vec_output_expr_ctxs[i]->root();
+            TParquetSchema parquet_schema;
+            parquet_schema.schema_column_name = _columns[i].name;
+            parquet_schemas.emplace_back(std::move(parquet_schema));
+        }
+        _vfile_writer.reset(new VParquetTransformer(
+                state, _file_writer_impl.get(), _vec_output_expr_ctxs, 
parquet_schemas,
+                parquet_compression_type, parquet_disable_dictionary, 
TParquetVersion::PARQUET_1_0,
+                false));
+        return _vfile_writer->open();
+    }
+    case TFileFormatType::FORMAT_ORC: {
+        orc::CompressionKind orc_compression_type;
+        switch (_hive_compress_type) {
+        case TFileCompressType::PLAIN: {
+            orc_compression_type = orc::CompressionKind::CompressionKind_NONE;
+            break;
+        }
+        case TFileCompressType::SNAPPYBLOCK: {
+            orc_compression_type = 
orc::CompressionKind::CompressionKind_SNAPPY;
+            break;
+        }
+        case TFileCompressType::ZLIB: {
+            orc_compression_type = orc::CompressionKind::CompressionKind_ZLIB;
+            break;
+        }
+        case TFileCompressType::ZSTD: {
+            orc_compression_type = orc::CompressionKind::CompressionKind_ZSTD;
+            break;
+        }
+        default: {
+            return Status::InternalError("Unsupported type {} with orc", 
_hive_compress_type);
+        }
+        }
+        orc_compression_type = orc::CompressionKind::CompressionKind_ZLIB;
+
+        std::unique_ptr<orc::Type> root_schema = orc::createStructType();
+        for (int i = 0; i < _columns.size(); i++) {
+            VExprSPtr column_expr = _vec_output_expr_ctxs[i]->root();
+            try {
+                root_schema->addStructField(_columns[i].name, 
_build_orc_type(column_expr->type()));
+            } catch (doris::Exception& e) {
+                return e.to_status();
+            }
+        }
+
+        _vfile_writer.reset(new VOrcTransformer(state, _file_writer_impl.get(),
+                                                _vec_output_expr_ctxs, 
std::move(root_schema),
+                                                false, orc_compression_type));
+        return _vfile_writer->open();
+    }
+    default: {
+        return Status::InternalError("Unsupported file format type {}",
+                                     to_string(_file_format_type));
+    }
+    }
+}
+
+Status VHivePartitionWriter::close(Status status) {
+    if (_vfile_writer != nullptr) {
+        Status st = _vfile_writer->close();
+        if (st != Status::OK()) {
+            LOG(WARNING) << fmt::format("_vfile_writer close failed, reason: 
{}", st.to_string());
+        }
+    }
+    if (status != Status::OK()) {
+        auto path = fmt::format("{}/{}", _write_info.write_path, _file_name);
+        Status st = _file_writer_impl->fs()->delete_file(path);
+        if (st != Status::OK()) {
+            LOG(WARNING) << fmt::format("Delete file {} failed, reason: {}", 
path, st.to_string());
+        }
+    }
+    _state->hive_partition_updates().emplace_back(_build_partition_update());
+    return Status::OK();
+}
+
+Status VHivePartitionWriter::write(vectorized::Block& block, 
vectorized::IColumn::Filter* filter) {
+    Block output_block;
+    RETURN_IF_ERROR(_projection_and_filter_block(block, filter, 
&output_block));
+    RETURN_IF_ERROR(_vfile_writer->write(output_block));
+    _row_count += output_block.rows();
+    _input_size_in_bytes += output_block.bytes();
+    return Status::OK();
+}
+
+std::unique_ptr<orc::Type> VHivePartitionWriter::_build_orc_type(
+        const TypeDescriptor& type_descriptor) {
+    std::pair<Status, std::unique_ptr<orc::Type>> result;
+    switch (type_descriptor.type) {
+    case TYPE_BOOLEAN: {
+        return orc::createPrimitiveType(orc::BOOLEAN);
+    }
+    case TYPE_TINYINT: {
+        return orc::createPrimitiveType(orc::BYTE);
+    }
+    case TYPE_SMALLINT: {
+        return orc::createPrimitiveType(orc::SHORT);
+    }
+    case TYPE_INT: {
+        return orc::createPrimitiveType(orc::INT);
+    }
+    case TYPE_BIGINT: {
+        return orc::createPrimitiveType(orc::LONG);
+    }
+    case TYPE_FLOAT: {
+        return orc::createPrimitiveType(orc::FLOAT);
+    }
+    case TYPE_DOUBLE: {
+        return orc::createPrimitiveType(orc::DOUBLE);
+    }
+    case TYPE_CHAR: {
+        return orc::createCharType(orc::CHAR, type_descriptor.len);
+    }
+    case TYPE_VARCHAR: {
+        return orc::createCharType(orc::VARCHAR, type_descriptor.len);
+    }
+    case TYPE_STRING: {
+        return orc::createPrimitiveType(orc::STRING);
+    }
+    case TYPE_BINARY: {
+        return orc::createPrimitiveType(orc::STRING);
+    }
+    case TYPE_DATEV2: {
+        return orc::createPrimitiveType(orc::DATE);
+    }
+    case TYPE_DATETIMEV2: {
+        return orc::createPrimitiveType(orc::TIMESTAMP);
+    }
+    case TYPE_DECIMAL32: {
+        return orc::createDecimalType(type_descriptor.precision, 
type_descriptor.scale);
+    }
+    case TYPE_DECIMAL64: {
+        return orc::createDecimalType(type_descriptor.precision, 
type_descriptor.scale);
+    }
+    case TYPE_DECIMAL128I: {
+        return orc::createDecimalType(type_descriptor.precision, 
type_descriptor.scale);
+    }
+    case TYPE_STRUCT: {
+        std::unique_ptr<orc::Type> struct_type = orc::createStructType();
+        for (int j = 0; j < type_descriptor.children.size(); ++j) {
+            struct_type->addStructField(type_descriptor.field_names[j],
+                                        
_build_orc_type(type_descriptor.children[j]));
+        }
+        return struct_type;
+    }
+    case TYPE_ARRAY: {
+        return 
orc::createListType(_build_orc_type(type_descriptor.children[0]));
+    }
+    case TYPE_MAP: {
+        return orc::createMapType(_build_orc_type(type_descriptor.children[0]),
+                                  
_build_orc_type(type_descriptor.children[1]));
+    }
+    default: {
+        throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR,
+                               "Unsupported type {} to build orc type",
+                               type_descriptor.debug_string());
+    }
+    }
+}
+
+Status 
VHivePartitionWriter::_projection_and_filter_block(doris::vectorized::Block& 
input_block,

Review Comment:
   warning: method '_projection_and_filter_block' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
   static Status 
VHivePartitionWriter::_projection_and_filter_block(doris::vectorized::Block& 
input_block,
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to