Copilot commented on code in PR #48720: URL: https://github.com/apache/doris/pull/48720#discussion_r2660088096
########## be/test/vec/sink/tablet_sink_hash_partitioner_test.cpp: ########## @@ -0,0 +1,282 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/sink/tablet_sink_hash_partitioner.h" + +#include <gen_cpp/DataSinks_types.h> +#include <gen_cpp/Descriptors_types.h> +#include <gen_cpp/Exprs_types.h> +#include <gen_cpp/FrontendService_types.h> +#include <gen_cpp/Partitions_types.h> +#include <gen_cpp/Status_types.h> + +#include <gtest/gtest.h> + +#include <cstdint> +#include <functional> +#include <limits> +#include <memory> +#include <utility> +#include <vector> + +#include "common/cast_set.h" +#include "common/config.h" +#include "exec/tablet_info.h" +#include "pipeline/exec/exchange_sink_operator.h" +#include "pipeline/operator/operator_helper.h" +#include "runtime/descriptor_helper.h" +#include "runtime/descriptors.h" +#include "runtime/types.h" +#include "testutil/column_helper.h" +#include "testutil/mock/mock_descriptors.h" +#include "util/debug_points.h" +#include "util/hash_util.hpp" +#include "util/runtime_profile.h" +#include "vec/common/assert_cast.h" +#include "vec/columns/column_vector.h" +#include "vec/data_types/data_type_nullable.h" +#include "vec/data_types/data_type_number.h" +#include "vec/sink/sink_test_utils.h" +#include "vec/sink/vtablet_finder.h" + +namespace doris::vectorized { + +namespace { + +using doris::pipeline::ExchangeSinkLocalState; +using doris::pipeline::ExchangeSinkOperatorX; +using doris::pipeline::OperatorContext; + +std::shared_ptr<ExchangeSinkOperatorX> _create_parent_operator( + OperatorContext& ctx, const std::shared_ptr<doris::MockRowDescriptor>& row_desc_holder) { + TDataStreamSink sink; + sink.dest_node_id = 0; + sink.output_partition.type = TPartitionType::UNPARTITIONED; + + return std::make_shared<ExchangeSinkOperatorX>(&ctx.state, *row_desc_holder, 0, sink, + std::vector<TPlanFragmentDestination> {}, + std::vector<TUniqueId> {}); +} + +std::unique_ptr<TabletSinkHashPartitioner> _create_partitioner( + OperatorContext& ctx, ExchangeSinkLocalState* local_state, size_t partition_count, + int64_t txn_id, const TOlapTableSchemaParam& schema, const TOlapTablePartitionParam& partition, + const TOlapTableLocationParam& location, TTupleId tablet_sink_tuple_id) { + auto partitioner = std::make_unique<TabletSinkHashPartitioner>( + partition_count, txn_id, schema, partition, location, tablet_sink_tuple_id, + local_state); + auto st = partitioner->open(&ctx.state); + EXPECT_TRUE(st.ok()) << st.to_string(); + return partitioner; +} + +TEST(TabletSinkHashPartitionerTest, DoPartitioningSkipsImmutablePartitionAndHashesOthers) { + OperatorContext ctx; + constexpr size_t partition_count = 8; + constexpr int64_t txn_id = 1; + + TOlapTableSchemaParam tschema; + TTupleId tablet_sink_tuple_id = 0; + int64_t schema_index_id = 0; + sink_test_utils::build_desc_tbl_and_schema(ctx, tschema, tablet_sink_tuple_id, schema_index_id, + false); + + auto row_desc_holder = std::make_shared<doris::MockRowDescriptor>( + std::vector<DataTypePtr> {std::make_shared<DataTypeInt32>()}, &ctx.pool); + auto parent_op = _create_parent_operator(ctx, row_desc_holder); + ExchangeSinkLocalState local_state(parent_op.get(), &ctx.state); + + auto tpartition = sink_test_utils::build_partition_param(schema_index_id); + ASSERT_EQ(tpartition.partitions.size(), 2); + tpartition.partitions[0].__set_is_mutable(false); + auto tlocation = sink_test_utils::build_location_param(); + + auto partitioner = _create_partitioner(ctx, &local_state, partition_count, txn_id, tschema, + tpartition, tlocation, tablet_sink_tuple_id); + + auto block = ColumnHelper::create_block<DataTypeInt32>({1, 25}); + auto st = partitioner->do_partitioning(&ctx.state, &block); + ASSERT_TRUE(st.ok()) << st.to_string(); + + const auto& skipped = partitioner->get_skipped(cast_set<int>(block.rows())); + ASSERT_EQ(skipped.size(), block.rows()); + EXPECT_TRUE(skipped[0]); + EXPECT_FALSE(skipped[1]); + + auto channel_ids = partitioner->get_channel_ids(); + auto* hashes = reinterpret_cast<const TabletSinkHashPartitioner::HashValType*>( + channel_ids.channel_id); + ASSERT_NE(hashes, nullptr); + EXPECT_EQ(hashes[0], -1); + + int64_t tablet_id = 200; + auto hash = HashUtil::zlib_crc_hash(&tablet_id, sizeof(int64_t), 0); + EXPECT_EQ(hashes[1], static_cast<int64_t>(hash % partition_count)); +} + +TEST(TabletSinkHashPartitionerTest, TryCutInLineCreatesPartitionAndReturnsBatchedBlock) { + OperatorContext ctx; + constexpr size_t partition_count = 8; + constexpr int64_t txn_id = 1; + + TOlapTableSchemaParam tschema; + TTupleId tablet_sink_tuple_id = 0; + int64_t schema_index_id = 0; + sink_test_utils::build_desc_tbl_and_schema(ctx, tschema, tablet_sink_tuple_id, schema_index_id, + false); + TSlotId partition_slot_id = tschema.slot_descs[0].id; + + auto row_desc_holder = std::make_shared<doris::MockRowDescriptor>( + std::vector<DataTypePtr> {std::make_shared<DataTypeInt32>()}, &ctx.pool); + auto parent_op = _create_parent_operator(ctx, row_desc_holder); + ExchangeSinkLocalState local_state(parent_op.get(), &ctx.state); + + auto tpartition = sink_test_utils::build_auto_partition_param( + schema_index_id, tablet_sink_tuple_id, partition_slot_id); + auto tlocation = sink_test_utils::build_location_param(); + + auto partitioner = _create_partitioner(ctx, &local_state, partition_count, txn_id, tschema, + tpartition, tlocation, tablet_sink_tuple_id); + + doris::config::enable_debug_points = true; + doris::DebugPoints::instance()->clear(); + + bool injected = false; + std::function<void(doris::TCreatePartitionRequest*, doris::TCreatePartitionResult*)> handler = + [&](doris::TCreatePartitionRequest* req, doris::TCreatePartitionResult* res) { + injected = true; + ASSERT_TRUE(req->__isset.partitionValues); + ASSERT_EQ(req->partitionValues.size(), 1); + ASSERT_EQ(req->partitionValues[0].size(), 1); + ASSERT_TRUE(req->partitionValues[0][0].__isset.value); + EXPECT_EQ(req->partitionValues[0][0].value, "15"); + + doris::TStatus tstatus; + tstatus.__set_status_code(doris::TStatusCode::OK); + res->__set_status(tstatus); + + doris::TOlapTablePartition new_part; + new_part.id = 3; + new_part.num_buckets = 1; + new_part.__set_is_mutable(true); + { + doris::TOlapTableIndexTablets index_tablets; + index_tablets.index_id = schema_index_id; + index_tablets.tablets = {300}; + new_part.indexes = {index_tablets}; + } + new_part.__set_start_keys({sink_test_utils::make_int_literal(10)}); + new_part.__set_end_keys({sink_test_utils::make_int_literal(20)}); + res->__set_partitions({new_part}); + + doris::TTabletLocation new_location; + new_location.__set_tablet_id(300); + new_location.__set_node_ids({1}); + res->__set_tablets({new_location}); + }; + doris::DebugPoints::instance()->add_with_callback( + "VRowDistribution.automatic_create_partition.inject_result", handler); + + { + auto block = ColumnHelper::create_block<DataTypeInt32>({15, 15}); + auto st = partitioner->do_partitioning(&ctx.state, &block); + ASSERT_TRUE(st.ok()) << st.to_string(); + + // Flush batching data at end-of-stream. + partitioner->mark_last_block(); + Block batched; + st = partitioner->try_cut_in_line(batched); + ASSERT_TRUE(st.ok()) << st.to_string(); + EXPECT_TRUE(injected); + + ASSERT_EQ(batched.rows(), 2); + ASSERT_EQ(batched.columns(), 1); + const auto& col = batched.get_by_position(0).column; + ASSERT_EQ(col->size(), 2); + EXPECT_EQ(assert_cast<const ColumnInt32&>(*col).get_data()[0], 15); + EXPECT_EQ(assert_cast<const ColumnInt32&>(*col).get_data()[1], 15); + } + + doris::DebugPoints::instance()->clear(); + doris::config::enable_debug_points = false; +} + +TEST(TabletSinkHashPartitionerTest, OlapTabletFinderRoundRobinEveryBatch) { + OperatorContext ctx; + + TOlapTableSchemaParam tschema; + TTupleId tablet_sink_tuple_id = 0; + int64_t schema_index_id = 0; + sink_test_utils::build_desc_tbl_and_schema(ctx, tschema, tablet_sink_tuple_id, schema_index_id, + false); + + auto schema = std::make_shared<OlapTableSchemaParam>(); + auto st = schema->init(tschema); + ASSERT_TRUE(st.ok()) << st.to_string(); + + auto tpartition = + sink_test_utils::build_partition_param_with_load_tablet_idx(schema_index_id, 0); + auto vpartition = std::make_unique<VOlapTablePartitionParam>(schema, tpartition); + st = vpartition->init(); + ASSERT_TRUE(st.ok()) << st.to_string(); + + OlapTabletFinder finder(vpartition.get(), + OlapTabletFinder::FindTabletMode::FIND_TABLET_EVERY_BATCH); + + { + auto block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3}); + std::vector<VOlapTablePartition*> partitions(block.rows(), nullptr); + std::vector<uint32_t> tablet_index(block.rows(), 0); + std::vector<bool> skip(block.rows(), false); + + st = finder.find_tablets(&ctx.state, &block, cast_set<int>(block.rows()), partitions, + tablet_index, skip, nullptr); + ASSERT_TRUE(st.ok()) << st.to_string(); + EXPECT_EQ(tablet_index[0], 0); + EXPECT_EQ(tablet_index[1], 0); + EXPECT_EQ(tablet_index[2], 0); + } + + { + auto block = ColumnHelper::create_block<DataTypeInt32>({1, 2}); + std::vector<VOlapTablePartition*> partitions(block.rows(), nullptr); + std::vector<uint32_t> tablet_index(block.rows(), 0); + std::vector<bool> skip(block.rows(), false); + + st = finder.find_tablets(&ctx.state, &block, cast_set<int>(block.rows()), partitions, + tablet_index, skip, nullptr); + ASSERT_TRUE(st.ok()) << st.to_string(); + EXPECT_EQ(tablet_index[0], 1); + EXPECT_EQ(tablet_index[1], 1); + } + + { + auto block = ColumnHelper::create_block<DataTypeInt32>({1}); + std::vector<VOlapTablePartition*> partitions(block.rows(), nullptr); + std::vector<uint32_t> tablet_index(block.rows(), 0); + std::vector<bool> skip(block.rows(), false); + + st = finder.find_tablets(&ctx.state, &block, cast_set<int>(block.rows()), partitions, + tablet_index, skip, nullptr); + ASSERT_TRUE(st.ok()) << st.to_string(); + EXPECT_EQ(tablet_index[0], 0); + } + } + + } // anonymous namespace Review Comment: Inconsistent indentation: this closing brace should not have a leading space to match the project's style. ```suggestion } } // anonymous namespace ``` ########## be/src/vec/sink/vrow_distribution.h: ########## @@ -129,55 +130,55 @@ class VRowDistribution { // mv where clause // v1 needs index->node->row_ids - tabletids // v2 needs index,tablet->rowids - Status generate_rows_distribution(vectorized::Block& input_block, - std::shared_ptr<vectorized::Block>& block, - int64_t& filtered_rows, + Status generate_rows_distribution(Block& input_block, std::shared_ptr<Block>& block, std::vector<RowPartTabletIds>& row_part_tablet_ids, int64_t& rows_stat_val); + // have 2 ways remind to deal batching block: + // 1. in row_distribution, _batching_rows reaches the threshold, this class set _deal_batched = true. + // 2. in caller, after last block and before close, set _deal_batched = true. bool need_deal_batching() const { return _deal_batched && _batching_rows > 0; } - size_t batching_rows() const { return _batching_rows; } // create partitions when need for auto-partition table using #_partitions_need_create. Status automatic_create_partition(); void clear_batching_stats(); + std::vector<bool> get_skipped() { return _skip; } // skipped in last round Review Comment: The get_skipped() method returns std::vector<bool> by value, which causes an unnecessary copy. Consider returning by const reference instead (const std::vector<bool>&) to avoid the copy overhead. ```suggestion const std::vector<bool>& get_skipped() const { return _skip; } // skipped in last round ``` ########## be/src/pipeline/shuffle/writer.cpp: ########## @@ -17,100 +17,235 @@ #include "writer.h" -#include <type_traits> +#include <glog/logging.h> +#include <algorithm> +#include <cstdint> + +#include "common/logging.h" +#include "common/status.h" #include "pipeline/exec/exchange_sink_operator.h" #include "vec/core/block.h" +#include "vec/sink/tablet_sink_hash_partitioner.h" namespace doris::pipeline { #include "common/compile_check_begin.h" + template <typename ChannelPtrType> -Status Writer::_handle_eof_channel(RuntimeState* state, ChannelPtrType channel, Status st) const { +Status WriterBase::_handle_eof_channel(RuntimeState* state, ChannelPtrType channel, + Status st) const { channel->set_receiver_eof(st); - // Chanel will not send RPC to the downstream when eof, so close chanel by OK status. + // Chanel will not send RPC to the downstream when eof, so close channel by OK status. Review Comment: Typo in comment: "Chanel" should be "Channel". ```suggestion // Channel will not send RPC to the downstream when eof, so close channel by OK status. ``` ########## be/test/vec/sink/tablet_sink_hash_partitioner_test.cpp: ########## @@ -0,0 +1,282 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/sink/tablet_sink_hash_partitioner.h" + +#include <gen_cpp/DataSinks_types.h> +#include <gen_cpp/Descriptors_types.h> +#include <gen_cpp/Exprs_types.h> +#include <gen_cpp/FrontendService_types.h> +#include <gen_cpp/Partitions_types.h> +#include <gen_cpp/Status_types.h> + +#include <gtest/gtest.h> + +#include <cstdint> +#include <functional> +#include <limits> +#include <memory> +#include <utility> +#include <vector> + +#include "common/cast_set.h" +#include "common/config.h" +#include "exec/tablet_info.h" +#include "pipeline/exec/exchange_sink_operator.h" +#include "pipeline/operator/operator_helper.h" +#include "runtime/descriptor_helper.h" +#include "runtime/descriptors.h" +#include "runtime/types.h" +#include "testutil/column_helper.h" +#include "testutil/mock/mock_descriptors.h" +#include "util/debug_points.h" +#include "util/hash_util.hpp" +#include "util/runtime_profile.h" +#include "vec/common/assert_cast.h" +#include "vec/columns/column_vector.h" +#include "vec/data_types/data_type_nullable.h" +#include "vec/data_types/data_type_number.h" +#include "vec/sink/sink_test_utils.h" +#include "vec/sink/vtablet_finder.h" + +namespace doris::vectorized { + +namespace { + +using doris::pipeline::ExchangeSinkLocalState; +using doris::pipeline::ExchangeSinkOperatorX; +using doris::pipeline::OperatorContext; + +std::shared_ptr<ExchangeSinkOperatorX> _create_parent_operator( + OperatorContext& ctx, const std::shared_ptr<doris::MockRowDescriptor>& row_desc_holder) { + TDataStreamSink sink; + sink.dest_node_id = 0; + sink.output_partition.type = TPartitionType::UNPARTITIONED; + + return std::make_shared<ExchangeSinkOperatorX>(&ctx.state, *row_desc_holder, 0, sink, + std::vector<TPlanFragmentDestination> {}, + std::vector<TUniqueId> {}); +} + +std::unique_ptr<TabletSinkHashPartitioner> _create_partitioner( + OperatorContext& ctx, ExchangeSinkLocalState* local_state, size_t partition_count, + int64_t txn_id, const TOlapTableSchemaParam& schema, const TOlapTablePartitionParam& partition, + const TOlapTableLocationParam& location, TTupleId tablet_sink_tuple_id) { + auto partitioner = std::make_unique<TabletSinkHashPartitioner>( + partition_count, txn_id, schema, partition, location, tablet_sink_tuple_id, + local_state); + auto st = partitioner->open(&ctx.state); + EXPECT_TRUE(st.ok()) << st.to_string(); + return partitioner; +} + +TEST(TabletSinkHashPartitionerTest, DoPartitioningSkipsImmutablePartitionAndHashesOthers) { + OperatorContext ctx; + constexpr size_t partition_count = 8; + constexpr int64_t txn_id = 1; + + TOlapTableSchemaParam tschema; + TTupleId tablet_sink_tuple_id = 0; + int64_t schema_index_id = 0; + sink_test_utils::build_desc_tbl_and_schema(ctx, tschema, tablet_sink_tuple_id, schema_index_id, + false); + + auto row_desc_holder = std::make_shared<doris::MockRowDescriptor>( + std::vector<DataTypePtr> {std::make_shared<DataTypeInt32>()}, &ctx.pool); + auto parent_op = _create_parent_operator(ctx, row_desc_holder); + ExchangeSinkLocalState local_state(parent_op.get(), &ctx.state); + + auto tpartition = sink_test_utils::build_partition_param(schema_index_id); + ASSERT_EQ(tpartition.partitions.size(), 2); + tpartition.partitions[0].__set_is_mutable(false); + auto tlocation = sink_test_utils::build_location_param(); + + auto partitioner = _create_partitioner(ctx, &local_state, partition_count, txn_id, tschema, + tpartition, tlocation, tablet_sink_tuple_id); + + auto block = ColumnHelper::create_block<DataTypeInt32>({1, 25}); + auto st = partitioner->do_partitioning(&ctx.state, &block); + ASSERT_TRUE(st.ok()) << st.to_string(); + + const auto& skipped = partitioner->get_skipped(cast_set<int>(block.rows())); + ASSERT_EQ(skipped.size(), block.rows()); + EXPECT_TRUE(skipped[0]); + EXPECT_FALSE(skipped[1]); + + auto channel_ids = partitioner->get_channel_ids(); + auto* hashes = reinterpret_cast<const TabletSinkHashPartitioner::HashValType*>( + channel_ids.channel_id); + ASSERT_NE(hashes, nullptr); + EXPECT_EQ(hashes[0], -1); + + int64_t tablet_id = 200; + auto hash = HashUtil::zlib_crc_hash(&tablet_id, sizeof(int64_t), 0); + EXPECT_EQ(hashes[1], static_cast<int64_t>(hash % partition_count)); +} + +TEST(TabletSinkHashPartitionerTest, TryCutInLineCreatesPartitionAndReturnsBatchedBlock) { + OperatorContext ctx; + constexpr size_t partition_count = 8; + constexpr int64_t txn_id = 1; + + TOlapTableSchemaParam tschema; + TTupleId tablet_sink_tuple_id = 0; + int64_t schema_index_id = 0; + sink_test_utils::build_desc_tbl_and_schema(ctx, tschema, tablet_sink_tuple_id, schema_index_id, + false); + TSlotId partition_slot_id = tschema.slot_descs[0].id; + + auto row_desc_holder = std::make_shared<doris::MockRowDescriptor>( + std::vector<DataTypePtr> {std::make_shared<DataTypeInt32>()}, &ctx.pool); + auto parent_op = _create_parent_operator(ctx, row_desc_holder); + ExchangeSinkLocalState local_state(parent_op.get(), &ctx.state); + + auto tpartition = sink_test_utils::build_auto_partition_param( + schema_index_id, tablet_sink_tuple_id, partition_slot_id); + auto tlocation = sink_test_utils::build_location_param(); + + auto partitioner = _create_partitioner(ctx, &local_state, partition_count, txn_id, tschema, + tpartition, tlocation, tablet_sink_tuple_id); + + doris::config::enable_debug_points = true; + doris::DebugPoints::instance()->clear(); + + bool injected = false; + std::function<void(doris::TCreatePartitionRequest*, doris::TCreatePartitionResult*)> handler = + [&](doris::TCreatePartitionRequest* req, doris::TCreatePartitionResult* res) { + injected = true; + ASSERT_TRUE(req->__isset.partitionValues); + ASSERT_EQ(req->partitionValues.size(), 1); + ASSERT_EQ(req->partitionValues[0].size(), 1); + ASSERT_TRUE(req->partitionValues[0][0].__isset.value); + EXPECT_EQ(req->partitionValues[0][0].value, "15"); + + doris::TStatus tstatus; + tstatus.__set_status_code(doris::TStatusCode::OK); + res->__set_status(tstatus); + + doris::TOlapTablePartition new_part; + new_part.id = 3; + new_part.num_buckets = 1; + new_part.__set_is_mutable(true); + { + doris::TOlapTableIndexTablets index_tablets; + index_tablets.index_id = schema_index_id; + index_tablets.tablets = {300}; + new_part.indexes = {index_tablets}; + } + new_part.__set_start_keys({sink_test_utils::make_int_literal(10)}); + new_part.__set_end_keys({sink_test_utils::make_int_literal(20)}); + res->__set_partitions({new_part}); + + doris::TTabletLocation new_location; + new_location.__set_tablet_id(300); + new_location.__set_node_ids({1}); + res->__set_tablets({new_location}); + }; + doris::DebugPoints::instance()->add_with_callback( + "VRowDistribution.automatic_create_partition.inject_result", handler); + + { + auto block = ColumnHelper::create_block<DataTypeInt32>({15, 15}); + auto st = partitioner->do_partitioning(&ctx.state, &block); + ASSERT_TRUE(st.ok()) << st.to_string(); + + // Flush batching data at end-of-stream. + partitioner->mark_last_block(); + Block batched; + st = partitioner->try_cut_in_line(batched); + ASSERT_TRUE(st.ok()) << st.to_string(); + EXPECT_TRUE(injected); + + ASSERT_EQ(batched.rows(), 2); + ASSERT_EQ(batched.columns(), 1); + const auto& col = batched.get_by_position(0).column; + ASSERT_EQ(col->size(), 2); + EXPECT_EQ(assert_cast<const ColumnInt32&>(*col).get_data()[0], 15); + EXPECT_EQ(assert_cast<const ColumnInt32&>(*col).get_data()[1], 15); + } + + doris::DebugPoints::instance()->clear(); + doris::config::enable_debug_points = false; +} + +TEST(TabletSinkHashPartitionerTest, OlapTabletFinderRoundRobinEveryBatch) { + OperatorContext ctx; + + TOlapTableSchemaParam tschema; + TTupleId tablet_sink_tuple_id = 0; + int64_t schema_index_id = 0; + sink_test_utils::build_desc_tbl_and_schema(ctx, tschema, tablet_sink_tuple_id, schema_index_id, + false); + + auto schema = std::make_shared<OlapTableSchemaParam>(); + auto st = schema->init(tschema); + ASSERT_TRUE(st.ok()) << st.to_string(); + + auto tpartition = + sink_test_utils::build_partition_param_with_load_tablet_idx(schema_index_id, 0); + auto vpartition = std::make_unique<VOlapTablePartitionParam>(schema, tpartition); + st = vpartition->init(); + ASSERT_TRUE(st.ok()) << st.to_string(); + + OlapTabletFinder finder(vpartition.get(), + OlapTabletFinder::FindTabletMode::FIND_TABLET_EVERY_BATCH); + + { + auto block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3}); + std::vector<VOlapTablePartition*> partitions(block.rows(), nullptr); + std::vector<uint32_t> tablet_index(block.rows(), 0); + std::vector<bool> skip(block.rows(), false); + + st = finder.find_tablets(&ctx.state, &block, cast_set<int>(block.rows()), partitions, + tablet_index, skip, nullptr); + ASSERT_TRUE(st.ok()) << st.to_string(); + EXPECT_EQ(tablet_index[0], 0); + EXPECT_EQ(tablet_index[1], 0); + EXPECT_EQ(tablet_index[2], 0); + } + + { + auto block = ColumnHelper::create_block<DataTypeInt32>({1, 2}); + std::vector<VOlapTablePartition*> partitions(block.rows(), nullptr); + std::vector<uint32_t> tablet_index(block.rows(), 0); + std::vector<bool> skip(block.rows(), false); + + st = finder.find_tablets(&ctx.state, &block, cast_set<int>(block.rows()), partitions, + tablet_index, skip, nullptr); + ASSERT_TRUE(st.ok()) << st.to_string(); + EXPECT_EQ(tablet_index[0], 1); + EXPECT_EQ(tablet_index[1], 1); + } + + { + auto block = ColumnHelper::create_block<DataTypeInt32>({1}); + std::vector<VOlapTablePartition*> partitions(block.rows(), nullptr); + std::vector<uint32_t> tablet_index(block.rows(), 0); + std::vector<bool> skip(block.rows(), false); + + st = finder.find_tablets(&ctx.state, &block, cast_set<int>(block.rows()), partitions, + tablet_index, skip, nullptr); + ASSERT_TRUE(st.ok()) << st.to_string(); + EXPECT_EQ(tablet_index[0], 0); + } + } + + } // anonymous namespace + Review Comment: Extra blank line before the closing brace. This is inconsistent with the project's style. ```suggestion ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
