airborne12 commented on code in PR #61383:
URL: https://github.com/apache/doris/pull/61383#discussion_r2963754135


##########
be/src/storage/segment/variant/variant_streaming_compaction_writer.cpp:
##########
@@ -0,0 +1,313 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "storage/segment/variant/variant_streaming_compaction_writer.h"
+
+#include <memory>
+
+#include "common/cast_set.h"
+#include "core/column/column_nullable.h"
+#include "core/column/column_variant.h"
+#include "exec/common/variant_util.h"
+#include "storage/index/indexed_column_writer.h"
+#include "storage/iterator/olap_data_convertor.h"
+#include "storage/segment/variant/variant_writer_helpers.h"
+#include "storage/types.h"
+
+namespace doris::segment_v2 {
+
+#include "common/compile_check_begin.h"
+
+VariantStreamingCompactionWriter::VariantStreamingCompactionWriter(
+        const ColumnWriterOptions& opts, const TabletColumn* column,
+        NestedGroupWriteProvider* nested_group_provider, VariantStatistics* 
statistics)
+        : _opts(opts),
+          _tablet_column(column),
+          _nested_group_provider(nested_group_provider),
+          _statistics(statistics) {}
+
+Status VariantStreamingCompactionWriter::init() {
+    
RETURN_IF_ERROR(build_nested_group_streaming_write_plan(_opts.input_rs_readers, 
*_tablet_column,
+                                                            &_streaming_plan));
+    RETURN_IF_ERROR(_init_root_writer());
+    int column_id = 1;
+    RETURN_IF_ERROR(_init_regular_subcolumn_writers(column_id));
+    RETURN_IF_ERROR(_nested_group_provider->init_with_plan(_streaming_plan, 
_tablet_column, _opts,
+                                                           &column_id, 
_statistics));
+    _statistics->to_pb(_opts.meta->mutable_variant_statistics());
+    _phase = Phase::INITIALIZED;
+    return Status::OK();
+}
+
+Status VariantStreamingCompactionWriter::_init_root_writer() {
+    _root_writer = std::make_unique<ScalarColumnWriter>(
+            _opts, 
std::unique_ptr<StorageField>(StorageFieldFactory::create(*_tablet_column)),
+            _opts.file_writer);
+    RETURN_IF_ERROR(_root_writer->init());
+    _opts.meta->set_num_rows(0);
+    return Status::OK();
+}
+
+Status VariantStreamingCompactionWriter::_init_regular_subcolumn_writers(int& 
column_id) {
+    _streaming_regular_subcolumn_writers.clear();
+    for (const auto& plan_entry : _streaming_plan.regular_subcolumns) {
+        TabletColumn tablet_column;
+        TabletIndexes subcolumn_indexes;
+        ColumnWriterOptions opts;
+        std::unique_ptr<ColumnWriter> writer;
+        
RETURN_IF_ERROR(variant_writer_helpers::prepare_subcolumn_writer_target(
+                _opts, *_tablet_column, column_id, plan_entry.path_in_data, 
plan_entry.data_type, 0,
+                0, nullptr /* existing_subcolumn_info */, false /* 
check_storage_type */,
+                &subcolumn_indexes, &opts, &writer, &tablet_column));
+        auto converter = std::make_unique<OlapBlockDataConvertor>();
+        converter->add_column_data_convertor(tablet_column);
+        _subcolumns_indexes.push_back(std::move(subcolumn_indexes));
+        _subcolumn_opts.push_back(opts);
+        _subcolumn_writers.push_back(std::move(writer));
+        _streaming_regular_subcolumn_writers.push_back(
+                StreamingRegularSubcolumnWriter {.plan = plan_entry,
+                                                 .tablet_column = 
std::move(tablet_column),
+                                                 .converter = 
std::move(converter)});
+        ++column_id;
+    }
+    return Status::OK();
+}
+
+Status VariantStreamingCompactionWriter::append_data(const uint8_t** ptr, 
size_t num_rows,
+                                                     const uint8_t* 
outer_null_map) {
+    RETURN_IF_ERROR(_check_initialized("append_data"));
+    RETURN_IF_ERROR(_append_input_from_raw(ptr, num_rows, outer_null_map));
+    if (num_rows > 0 && _phase == Phase::INITIALIZED) {
+        _phase = Phase::APPENDING;
+    }
+    return Status::OK();
+}
+
+Status VariantStreamingCompactionWriter::_append_input_from_raw(const 
uint8_t** ptr,
+                                                                size_t 
num_rows,
+                                                                const uint8_t* 
outer_null_map) {
+    const auto* column = reinterpret_cast<const VariantColumnData*>(*ptr);
+    const auto& src = *reinterpret_cast<const 
ColumnVariant*>(column->column_data);
+    RETURN_IF_ERROR(src.sanitize());
+    return _append_input(src, column->row_pos, num_rows, outer_null_map);
+}
+
+Status VariantStreamingCompactionWriter::_append_input(const ColumnVariant& 
src, size_t row_pos,
+                                                       size_t num_rows,
+                                                       const uint8_t* 
outer_null_map) {
+    auto chunk_variant = ColumnVariant::create(0);
+    chunk_variant->insert_range_from(src, row_pos, num_rows);
+    RETURN_IF_ERROR(chunk_variant->sanitize());
+    chunk_variant->finalize();
+    return _append_chunk(*chunk_variant, outer_null_map);
+}
+
+Status VariantStreamingCompactionWriter::_append_root_column(const 
ColumnVariant& chunk_variant,
+                                                             const uint8_t* 
outer_null_map) {
+    auto* variant = const_cast<ColumnVariant*>(&chunk_variant);

Review Comment:
   **P3 — `const_cast` on `const ColumnVariant&` parameter**
   
   Both `_append_root_column` and `_append_regular_subcolumns` accept `const 
ColumnVariant&` but mutate the argument via `const_cast`. The actual caller 
always passes a locally-created mutable copy, so this is safe in practice. Nit: 
changing the parameter to `ColumnVariant&` (along with `_append_chunk`) would 
make the contract honest.



##########
be/src/exec/common/variant_util.cpp:
##########
@@ -969,6 +983,16 @@ Status VariantCompactionUtil::check_path_stats(const 
std::vector<RowsetSharedPtr
             return Status::OK();
         }
     }
+    for (const auto& column : output->tablet_schema()->columns()) {
+        if (column->is_variant_type() && 
!should_check_variant_path_stats(*column)) {
+            return Status::OK();
+        }
+    }
+    for (const auto& column : output->tablet_schema()->columns()) {
+        if (!column->is_variant_type()) {
+            continue;

Review Comment:
   **P2 — Empty loop body (likely refactoring leftover)**
   
   This loop iterates over output columns, skips non-variant ones with 
`continue`, but does nothing with the variant columns. Looks like the loop body 
was removed during refactoring but the skeleton was left behind. Should either 
be deleted or have its intended logic filled in.



##########
be/src/exec/common/variant_util.h:
##########
@@ -72,6 +72,18 @@ bool glob_match_re2(const std::string& glob_pattern, const 
std::string& candidat
 using PathToNoneNullValues = std::unordered_map<std::string, int64_t>;
 using PathToDataTypes = std::unordered_map<PathInData, 
std::vector<DataTypePtr>, PathInData::Hash>;
 
+inline bool should_record_variant_path_stats(const TabletColumn& column) {

Review Comment:
   **P2 — Three `should_*` helpers have identical implementations**
   
   `should_record_variant_path_stats`, `should_write_variant_binary_columns`, 
and `should_check_variant_path_stats` all return 
`!column.variant_enable_nested_group()`. If they are expected to diverge in the 
future, please add a comment explaining the intended semantic differences. 
Otherwise consider consolidating into a single `is_nested_group_variant()` 
helper.



##########
be/src/exec/common/variant_util.cpp:
##########
@@ -936,6 +943,13 @@ Status VariantCompactionUtil::check_path_stats(const 
std::vector<RowsetSharedPtr
     if (output->tablet_schema()->num_variant_columns() == 0) {
         return Status::OK();
     }
+    for (const auto& rowset : intputs) {
+        for (const auto& column : rowset->tablet_schema()->columns()) {
+            if (column->is_variant_type() && 
!should_check_variant_path_stats(*column)) {
+                return Status::OK();

Review Comment:
   **P1 — `check_path_stats` early return is too coarse-grained**
   
   This `return Status::OK()` bails out of the **entire** `check_path_stats` 
function when any single variant column has NG enabled. Since 
`variant_enable_nested_group` is a per-column property, a schema with both NG 
and non-NG variant columns would skip path-stats validation for the non-NG 
columns too.
   
   The same pattern appears again at line 988 for the output schema.
   
   Since this function validates data integrity after compaction, skipping it 
silently is risky. Consider changing to per-column `continue` in the downstream 
`aggregate_path_to_stats` loop instead of a function-level `return`.



##########
be/src/storage/segment/variant/variant_streaming_compaction_writer.cpp:
##########
@@ -0,0 +1,313 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "storage/segment/variant/variant_streaming_compaction_writer.h"
+
+#include <memory>
+
+#include "common/cast_set.h"
+#include "core/column/column_nullable.h"
+#include "core/column/column_variant.h"
+#include "exec/common/variant_util.h"
+#include "storage/index/indexed_column_writer.h"
+#include "storage/iterator/olap_data_convertor.h"
+#include "storage/segment/variant/variant_writer_helpers.h"
+#include "storage/types.h"
+
+namespace doris::segment_v2 {
+
+#include "common/compile_check_begin.h"
+
+VariantStreamingCompactionWriter::VariantStreamingCompactionWriter(
+        const ColumnWriterOptions& opts, const TabletColumn* column,
+        NestedGroupWriteProvider* nested_group_provider, VariantStatistics* 
statistics)
+        : _opts(opts),
+          _tablet_column(column),
+          _nested_group_provider(nested_group_provider),
+          _statistics(statistics) {}
+
+Status VariantStreamingCompactionWriter::init() {
+    
RETURN_IF_ERROR(build_nested_group_streaming_write_plan(_opts.input_rs_readers, 
*_tablet_column,
+                                                            &_streaming_plan));
+    RETURN_IF_ERROR(_init_root_writer());
+    int column_id = 1;
+    RETURN_IF_ERROR(_init_regular_subcolumn_writers(column_id));
+    RETURN_IF_ERROR(_nested_group_provider->init_with_plan(_streaming_plan, 
_tablet_column, _opts,
+                                                           &column_id, 
_statistics));
+    _statistics->to_pb(_opts.meta->mutable_variant_statistics());
+    _phase = Phase::INITIALIZED;
+    return Status::OK();
+}
+
+Status VariantStreamingCompactionWriter::_init_root_writer() {
+    _root_writer = std::make_unique<ScalarColumnWriter>(
+            _opts, 
std::unique_ptr<StorageField>(StorageFieldFactory::create(*_tablet_column)),
+            _opts.file_writer);
+    RETURN_IF_ERROR(_root_writer->init());
+    _opts.meta->set_num_rows(0);
+    return Status::OK();
+}
+
+Status VariantStreamingCompactionWriter::_init_regular_subcolumn_writers(int& 
column_id) {
+    _streaming_regular_subcolumn_writers.clear();
+    for (const auto& plan_entry : _streaming_plan.regular_subcolumns) {
+        TabletColumn tablet_column;
+        TabletIndexes subcolumn_indexes;
+        ColumnWriterOptions opts;
+        std::unique_ptr<ColumnWriter> writer;
+        
RETURN_IF_ERROR(variant_writer_helpers::prepare_subcolumn_writer_target(
+                _opts, *_tablet_column, column_id, plan_entry.path_in_data, 
plan_entry.data_type, 0,
+                0, nullptr /* existing_subcolumn_info */, false /* 
check_storage_type */,
+                &subcolumn_indexes, &opts, &writer, &tablet_column));
+        auto converter = std::make_unique<OlapBlockDataConvertor>();
+        converter->add_column_data_convertor(tablet_column);
+        _subcolumns_indexes.push_back(std::move(subcolumn_indexes));
+        _subcolumn_opts.push_back(opts);
+        _subcolumn_writers.push_back(std::move(writer));
+        _streaming_regular_subcolumn_writers.push_back(
+                StreamingRegularSubcolumnWriter {.plan = plan_entry,
+                                                 .tablet_column = 
std::move(tablet_column),
+                                                 .converter = 
std::move(converter)});
+        ++column_id;
+    }
+    return Status::OK();
+}
+
+Status VariantStreamingCompactionWriter::append_data(const uint8_t** ptr, 
size_t num_rows,
+                                                     const uint8_t* 
outer_null_map) {
+    RETURN_IF_ERROR(_check_initialized("append_data"));
+    RETURN_IF_ERROR(_append_input_from_raw(ptr, num_rows, outer_null_map));
+    if (num_rows > 0 && _phase == Phase::INITIALIZED) {
+        _phase = Phase::APPENDING;
+    }
+    return Status::OK();
+}
+
+Status VariantStreamingCompactionWriter::_append_input_from_raw(const 
uint8_t** ptr,
+                                                                size_t 
num_rows,
+                                                                const uint8_t* 
outer_null_map) {
+    const auto* column = reinterpret_cast<const VariantColumnData*>(*ptr);
+    const auto& src = *reinterpret_cast<const 
ColumnVariant*>(column->column_data);
+    RETURN_IF_ERROR(src.sanitize());
+    return _append_input(src, column->row_pos, num_rows, outer_null_map);
+}
+
+Status VariantStreamingCompactionWriter::_append_input(const ColumnVariant& 
src, size_t row_pos,
+                                                       size_t num_rows,
+                                                       const uint8_t* 
outer_null_map) {
+    auto chunk_variant = ColumnVariant::create(0);
+    chunk_variant->insert_range_from(src, row_pos, num_rows);
+    RETURN_IF_ERROR(chunk_variant->sanitize());
+    chunk_variant->finalize();

Review Comment:
   **P3 — Redundant `is_finalized()` check after whole-column `finalize()`**
   
   `_append_input` calls `chunk_variant->finalize()` here, so by the time 
`_append_regular_subcolumns` checks `!subcolumn->is_finalized()` (line 181), it 
will always be true. The defensive check doesn't hurt correctness but is dead 
code on this path.



##########
be/src/exec/common/variant_util.cpp:
##########
@@ -848,6 +856,10 @@ Status 
VariantCompactionUtil::aggregate_variant_extended_info(
         if (!column->is_variant_type()) {
             continue;
         }
+        if (column->variant_enable_nested_group()) {

Review Comment:
   **P2 — NG column skips `nested_paths` collection without explanation**
   
   When `variant_enable_nested_group()` is true, this `continue` skips the 
entire segment traversal including `get_nested_paths()`. The streaming write 
plan collects paths independently via 
`VariantColumnReader::get_nested_group_readers()`, so this is likely correct — 
but a brief comment explaining why the skip is safe would help future readers.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to