This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new b7776a8901b [env](compile) open compile check for table writers
(#45005)
b7776a8901b is described below
commit b7776a8901ba6ad04ee01dead3b8cfca5d37ce48
Author: TengJianPing <[email protected]>
AuthorDate: Mon Dec 9 15:08:52 2024 +0800
[env](compile) open compile check for table writers (#45005)
---
be/src/vec/exec/format/table/iceberg/struct_like.h | 5 +--
be/src/vec/sink/vrow_distribution.cpp | 16 +++++++--
be/src/vec/sink/vrow_distribution.h | 2 +-
be/src/vec/sink/writer/async_result_writer.cpp | 3 +-
be/src/vec/sink/writer/async_result_writer.h | 2 +-
be/src/vec/sink/writer/iceberg/partition_data.h | 15 +++------
.../sink/writer/iceberg/partition_transformers.h | 38 +++++++++++++++-------
.../sink/writer/iceberg/viceberg_table_writer.cpp | 1 +
be/src/vec/sink/writer/vhive_table_writer.cpp | 1 +
be/src/vec/sink/writer/vmysql_table_writer.cpp | 8 ++---
be/src/vec/sink/writer/vodbc_table_writer.cpp | 1 +
be/src/vec/sink/writer/vtablet_writer.cpp | 11 ++++---
be/src/vec/sink/writer/vtablet_writer_v2.cpp | 3 +-
13 files changed, 64 insertions(+), 42 deletions(-)
diff --git a/be/src/vec/exec/format/table/iceberg/struct_like.h
b/be/src/vec/exec/format/table/iceberg/struct_like.h
index 23d394066fc..cf02e2d1eb8 100644
--- a/be/src/vec/exec/format/table/iceberg/struct_like.h
+++ b/be/src/vec/exec/format/table/iceberg/struct_like.h
@@ -26,11 +26,8 @@ namespace iceberg {
class StructLike {
public:
virtual ~StructLike() = default;
- virtual int size() const = 0;
- virtual std::any get(int pos) const = 0;
-
- virtual void set(int pos, const std::any& value) = 0;
+ virtual std::any get(size_t pos) const = 0;
};
} // namespace iceberg
diff --git a/be/src/vec/sink/vrow_distribution.cpp
b/be/src/vec/sink/vrow_distribution.cpp
index f374064c0af..4a9790a4437 100644
--- a/be/src/vec/sink/vrow_distribution.cpp
+++ b/be/src/vec/sink/vrow_distribution.cpp
@@ -25,6 +25,7 @@
#include <memory>
#include <string>
+#include "common/cast_set.h"
#include "common/logging.h"
#include "common/status.h"
#include "runtime/client_cache.h"
@@ -225,7 +226,10 @@ void
VRowDistribution::_filter_block_by_skip(vectorized::Block* block,
auto& partition_ids = row_part_tablet_id.partition_ids;
auto& tablet_ids = row_part_tablet_id.tablet_ids;
- for (size_t i = 0; i < block->rows(); i++) {
+ auto rows = block->rows();
+ // row count of a block should not exceed UINT32_MAX
+ auto rows_uint32 = cast_set<uint32_t>(rows);
+ for (uint32_t i = 0; i < rows_uint32; i++) {
if (!_skip[i]) {
row_ids.emplace_back(i);
partition_ids.emplace_back(_partitions[i]->id);
@@ -250,7 +254,10 @@ Status
VRowDistribution::_filter_block_by_skip_and_where_clause(
auto& tablet_ids = row_part_tablet_id.tablet_ids;
if (const auto* nullable_column =
vectorized::check_and_get_column<vectorized::ColumnNullable>(*filter_column)) {
- for (size_t i = 0; i < block->rows(); i++) {
+ auto rows = block->rows();
+ // row count of a block should not exceed UINT32_MAX
+ auto rows_uint32 = cast_set<uint32_t>(rows);
+ for (uint32_t i = 0; i < rows_uint32; i++) {
if (nullable_column->get_bool_inline(i) && !_skip[i]) {
row_ids.emplace_back(i);
partition_ids.emplace_back(_partitions[i]->id);
@@ -267,7 +274,10 @@ Status
VRowDistribution::_filter_block_by_skip_and_where_clause(
_filter_block_by_skip(block, row_part_tablet_id);
} else {
const auto& filter = assert_cast<const
vectorized::ColumnUInt8&>(*filter_column).get_data();
- for (size_t i = 0; i < block->rows(); i++) {
+ auto rows = block->rows();
+ // row count of a block should not exceed UINT32_MAX
+ auto rows_uint32 = cast_set<uint32_t>(rows);
+ for (uint32_t i = 0; i < rows_uint32; i++) {
if (filter[i] != 0 && !_skip[i]) {
row_ids.emplace_back(i);
partition_ids.emplace_back(_partitions[i]->id);
diff --git a/be/src/vec/sink/vrow_distribution.h
b/be/src/vec/sink/vrow_distribution.h
index 40202556290..6248a28dba5 100644
--- a/be/src/vec/sink/vrow_distribution.h
+++ b/be/src/vec/sink/vrow_distribution.h
@@ -48,7 +48,7 @@ class VNodeChannel;
// <row_idx, partition_id, tablet_id>
class RowPartTabletIds {
public:
- std::vector<int64_t> row_ids;
+ std::vector<uint32_t> row_ids;
std::vector<int64_t> partition_ids;
std::vector<int64_t> tablet_ids;
diff --git a/be/src/vec/sink/writer/async_result_writer.cpp
b/be/src/vec/sink/writer/async_result_writer.cpp
index c17b84b2dbe..65210a53ec3 100644
--- a/be/src/vec/sink/writer/async_result_writer.cpp
+++ b/be/src/vec/sink/writer/async_result_writer.cpp
@@ -32,6 +32,7 @@ class RowDescriptor;
class TExpr;
namespace vectorized {
+#include "common/compile_check_begin.h"
AsyncResultWriter::AsyncResultWriter(const
doris::vectorized::VExprContextSPtrs& output_expr_ctxs,
std::shared_ptr<pipeline::Dependency> dep,
@@ -225,7 +226,7 @@ void
AsyncResultWriter::_return_free_block(std::unique_ptr<Block> b) {
}
std::unique_ptr<Block>
AsyncResultWriter::_get_free_block(doris::vectorized::Block* block,
- int rows) {
+ size_t rows) {
std::unique_ptr<Block> b;
if (!_free_blocks.try_dequeue(b)) {
b = block->create_same_struct_block(rows, true);
diff --git a/be/src/vec/sink/writer/async_result_writer.h
b/be/src/vec/sink/writer/async_result_writer.h
index 36bca48358a..513f2aa7984 100644
--- a/be/src/vec/sink/writer/async_result_writer.h
+++ b/be/src/vec/sink/writer/async_result_writer.h
@@ -76,7 +76,7 @@ protected:
Status _projection_block(Block& input_block, Block* output_block);
const VExprContextSPtrs& _vec_output_expr_ctxs;
- std::unique_ptr<Block> _get_free_block(Block*, int rows);
+ std::unique_ptr<Block> _get_free_block(Block*, size_t rows);
void _return_free_block(std::unique_ptr<Block>);
diff --git a/be/src/vec/sink/writer/iceberg/partition_data.h
b/be/src/vec/sink/writer/iceberg/partition_data.h
index 512dbd47904..d3dfb1e8ecc 100644
--- a/be/src/vec/sink/writer/iceberg/partition_data.h
+++ b/be/src/vec/sink/writer/iceberg/partition_data.h
@@ -21,31 +21,24 @@
namespace doris {
namespace vectorized {
+#include "common/compile_check_begin.h"
class PartitionData : public iceberg::StructLike {
public:
explicit PartitionData(std::vector<std::any> partition_values)
: _partition_values(std::move(partition_values)) {}
- int size() const override { return _partition_values.size(); }
-
- std::any get(int pos) const override {
- if (pos < 0 || pos >= _partition_values.size()) {
+ std::any get(size_t pos) const override {
+ if (pos >= _partition_values.size()) {
throw std::out_of_range("Index out of range");
}
return _partition_values[pos];
}
- void set(int pos, const std::any& value) override {
- if (pos < 0 || pos >= _partition_values.size()) {
- throw std::out_of_range("Index out of range");
- }
- _partition_values[pos] = value;
- }
-
private:
std::vector<std::any> _partition_values;
};
} // namespace vectorized
} // namespace doris
+#include "common/compile_check_end.h"
diff --git a/be/src/vec/sink/writer/iceberg/partition_transformers.h
b/be/src/vec/sink/writer/iceberg/partition_transformers.h
index 84ee3029cdd..79eb385b298 100644
--- a/be/src/vec/sink/writer/iceberg/partition_transformers.h
+++ b/be/src/vec/sink/writer/iceberg/partition_transformers.h
@@ -30,6 +30,7 @@ class PartitionField;
}; // namespace iceberg
namespace vectorized {
+#include "common/compile_check_begin.h"
class IColumn;
class PartitionColumnTransform;
@@ -174,7 +175,7 @@ public:
temp_arguments[0] = 0; // str column
temp_arguments[1] = 1; // pos
temp_arguments[2] = 2; // width
- size_t result_column_id = 3;
+ uint32_t result_column_id = 3;
SubstringUtil::substring_execute(temp_block, temp_arguments,
result_column_id,
temp_block.rows());
@@ -623,9 +624,9 @@ public:
DateV2Value<DateV2ValueType> value =
binary_cast<uint32_t,
DateV2Value<DateV2ValueType>>(*(UInt32*)p_in);
- int32_t days_from_unix_epoch = value.daynr() - 719528;
- Int64 long_value = static_cast<Int64>(days_from_unix_epoch);
- uint32_t hash_value = HashUtil::murmur_hash3_32(&long_value,
sizeof(long_value), 0);
+ int64_t days_from_unix_epoch = value.daynr() - 719528;
+ uint32_t hash_value =
HashUtil::murmur_hash3_32(&days_from_unix_epoch,
+
sizeof(days_from_unix_epoch), 0);
*p_out = (hash_value & INT32_MAX) % _bucket_num;
++p_in;
@@ -836,7 +837,9 @@ public:
while (p_in < end_in) {
DateV2Value<DateV2ValueType> value =
binary_cast<uint32_t,
DateV2Value<DateV2ValueType>>(*(UInt32*)p_in);
- *p_out =
datetime_diff<YEAR>(PartitionColumnTransformUtils::epoch_date(), value);
+ // datetime_diff<YEAR> actually returns int
+ *p_out = cast_set<int, int64_t, false>(
+
datetime_diff<YEAR>(PartitionColumnTransformUtils::epoch_date(), value));
++p_in;
++p_out;
}
@@ -906,7 +909,9 @@ public:
while (p_in < end_in) {
DateV2Value<DateTimeV2ValueType> value =
binary_cast<uint64_t,
DateV2Value<DateTimeV2ValueType>>(*(UInt64*)p_in);
- *p_out =
datetime_diff<YEAR>(PartitionColumnTransformUtils::epoch_datetime(), value);
+ // datetime_diff<YEAR> actually returns int
+ *p_out = cast_set<int, int64_t, false>(
+
datetime_diff<YEAR>(PartitionColumnTransformUtils::epoch_datetime(), value));
++p_in;
++p_out;
}
@@ -976,7 +981,9 @@ public:
while (p_in < end_in) {
DateV2Value<DateV2ValueType> value =
binary_cast<uint32_t,
DateV2Value<DateV2ValueType>>(*(UInt32*)p_in);
- *p_out =
datetime_diff<MONTH>(PartitionColumnTransformUtils::epoch_date(), value);
+ // datetime_diff<MONTH> actually returns int
+ *p_out = cast_set<int, int64_t, false>(
+
datetime_diff<MONTH>(PartitionColumnTransformUtils::epoch_date(), value));
++p_in;
++p_out;
}
@@ -1046,7 +1053,9 @@ public:
while (p_in < end_in) {
DateV2Value<DateTimeV2ValueType> value =
binary_cast<uint64_t,
DateV2Value<DateTimeV2ValueType>>(*(UInt64*)p_in);
- *p_out =
datetime_diff<MONTH>(PartitionColumnTransformUtils::epoch_datetime(), value);
+ // datetime_diff<MONTH> actually returns int
+ *p_out = cast_set<int, int64_t, false>(
+
datetime_diff<MONTH>(PartitionColumnTransformUtils::epoch_datetime(), value));
++p_in;
++p_out;
}
@@ -1116,7 +1125,9 @@ public:
while (p_in < end_in) {
DateV2Value<DateV2ValueType> value =
binary_cast<uint32_t,
DateV2Value<DateV2ValueType>>(*(UInt32*)p_in);
- *p_out =
datetime_diff<DAY>(PartitionColumnTransformUtils::epoch_date(), value);
+ // datetime_diff<DAY> actually returns int
+ *p_out = cast_set<int, int64_t, false>(
+
datetime_diff<DAY>(PartitionColumnTransformUtils::epoch_date(), value));
++p_in;
++p_out;
}
@@ -1192,7 +1203,9 @@ public:
while (p_in < end_in) {
DateV2Value<DateTimeV2ValueType> value =
binary_cast<uint64_t,
DateV2Value<DateTimeV2ValueType>>(*(UInt64*)p_in);
- *p_out =
datetime_diff<DAY>(PartitionColumnTransformUtils::epoch_datetime(), value);
+ // datetime_diff<DAY> actually returns int
+ *p_out = cast_set<int, int64_t, false>(
+
datetime_diff<DAY>(PartitionColumnTransformUtils::epoch_datetime(), value));
++p_in;
++p_out;
}
@@ -1267,7 +1280,9 @@ public:
while (p_in < end_in) {
DateV2Value<DateTimeV2ValueType> value =
binary_cast<uint64_t,
DateV2Value<DateTimeV2ValueType>>(*(UInt64*)p_in);
- *p_out =
datetime_diff<HOUR>(PartitionColumnTransformUtils::epoch_datetime(), value);
+ // hour diff would't overflow int32
+ *p_out = cast_set<int, int64_t, false>(
+
datetime_diff<HOUR>(PartitionColumnTransformUtils::epoch_datetime(), value));
++p_in;
++p_out;
}
@@ -1333,3 +1348,4 @@ private:
} // namespace vectorized
} // namespace doris
+#include "common/compile_check_end.h"
diff --git a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp
b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp
index 280cf8b8107..29c97b59ea4 100644
--- a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp
+++ b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp
@@ -31,6 +31,7 @@
namespace doris {
namespace vectorized {
+#include "common/compile_check_begin.h"
VIcebergTableWriter::VIcebergTableWriter(const TDataSink& t_sink,
const VExprContextSPtrs&
output_expr_ctxs,
diff --git a/be/src/vec/sink/writer/vhive_table_writer.cpp
b/be/src/vec/sink/writer/vhive_table_writer.cpp
index 6eb478c01b7..17e6bba326f 100644
--- a/be/src/vec/sink/writer/vhive_table_writer.cpp
+++ b/be/src/vec/sink/writer/vhive_table_writer.cpp
@@ -28,6 +28,7 @@
namespace doris {
namespace vectorized {
+#include "common/compile_check_begin.h"
VHiveTableWriter::VHiveTableWriter(const TDataSink& t_sink,
const VExprContextSPtrs& output_expr_ctxs,
diff --git a/be/src/vec/sink/writer/vmysql_table_writer.cpp
b/be/src/vec/sink/writer/vmysql_table_writer.cpp
index a0d47ffec1e..e4529c59c9a 100644
--- a/be/src/vec/sink/writer/vmysql_table_writer.cpp
+++ b/be/src/vec/sink/writer/vmysql_table_writer.cpp
@@ -51,6 +51,7 @@
namespace doris {
namespace vectorized {
+#include "common/compile_check_begin.h"
std::string MysqlConnInfo::debug_string() const {
std::stringstream ss;
@@ -124,9 +125,9 @@ Status VMysqlTableWriter::write(RuntimeState* state,
vectorized::Block& block) {
Status VMysqlTableWriter::_insert_row(vectorized::Block& block, size_t row) {
_insert_stmt_buffer.clear();
fmt::format_to(_insert_stmt_buffer, "INSERT INTO {} VALUES (",
_conn_info.table_name);
- int num_columns = _vec_output_expr_ctxs.size();
+ size_t num_columns = _vec_output_expr_ctxs.size();
- for (int i = 0; i < num_columns; ++i) {
+ for (size_t i = 0; i < num_columns; ++i) {
auto& column_ptr = block.get_by_position(i).column;
auto& type_ptr = block.get_by_position(i).type;
@@ -236,8 +237,7 @@ Status VMysqlTableWriter::_insert_row(vectorized::Block&
block, size_t row) {
break;
}
case TYPE_DATETIMEV2: {
- uint32_t int_val =
- assert_cast<const
vectorized::ColumnUInt64&>(*column).get_data()[row];
+ auto int_val = assert_cast<const
vectorized::ColumnUInt64&>(*column).get_data()[row];
DateV2Value<DateTimeV2ValueType> value =
binary_cast<uint64_t,
DateV2Value<DateTimeV2ValueType>>(int_val);
diff --git a/be/src/vec/sink/writer/vodbc_table_writer.cpp
b/be/src/vec/sink/writer/vodbc_table_writer.cpp
index 19cb2e50109..d99dfc56aaa 100644
--- a/be/src/vec/sink/writer/vodbc_table_writer.cpp
+++ b/be/src/vec/sink/writer/vodbc_table_writer.cpp
@@ -29,6 +29,7 @@
namespace doris {
namespace vectorized {
+#include "common/compile_check_begin.h"
ODBCConnectorParam VOdbcTableWriter::create_connect_param(const
doris::TDataSink& t_sink) {
const TOdbcTableSink& t_odbc_sink = t_sink.odbc_table_sink;
diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp
b/be/src/vec/sink/writer/vtablet_writer.cpp
index 504ffb9cb74..55b6845b6bc 100644
--- a/be/src/vec/sink/writer/vtablet_writer.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer.cpp
@@ -92,6 +92,7 @@ namespace doris {
class TExpr;
namespace vectorized {
+#include "common/compile_check_begin.h"
bvar::Adder<int64_t> g_sink_write_bytes;
bvar::PerSecond<bvar::Adder<int64_t>>
g_sink_write_bytes_per_second("sink_throughput_byte",
@@ -662,14 +663,14 @@ void VNodeChannel::try_send_pending_block(RuntimeState*
state) {
_send_block_callback->clear_in_flight();
return;
}
- if (compressed_bytes >= double(config::brpc_max_body_size) * 0.95F) {
+ if (double(compressed_bytes) >= double(config::brpc_max_body_size) *
0.95F) {
LOG(WARNING) << "send block too large, this rpc may failed. send
size: "
<< compressed_bytes << ", threshold: " <<
config::brpc_max_body_size
<< ", " << channel_info();
}
}
- int remain_ms = _rpc_timeout_ms - _timeout_watch.elapsed_time() /
NANOS_PER_MILLIS;
+ auto remain_ms = _rpc_timeout_ms - _timeout_watch.elapsed_time() /
NANOS_PER_MILLIS;
if (UNLIKELY(remain_ms < config::min_load_rpc_timeout_ms)) {
if (remain_ms <= 0 && !request->eos()) {
cancel(fmt::format("{}, err: timeout", channel_info()));
@@ -847,7 +848,7 @@ void VNodeChannel::_add_block_success_callback(const
PTabletWriterAddBlockResult
if (result.has_load_channel_profile()) {
TRuntimeProfileTree tprofile;
const auto* buf = (const uint8_t*)result.load_channel_profile().data();
- uint32_t len = result.load_channel_profile().size();
+ auto len = cast_set<uint32_t>(result.load_channel_profile().size());
auto st = deserialize_thrift_msg(buf, &len, false, &tprofile);
if (st.ok()) {
_state->load_channel_profile()->update(tprofile);
@@ -917,7 +918,7 @@ void VNodeChannel::cancel(const std::string& cancel_msg) {
PTabletWriterCancelRequest,
DummyBrpcCallback<PTabletWriterCancelResult>>::create_unique(request,
cancel_callback);
- int remain_ms = _rpc_timeout_ms - _timeout_watch.elapsed_time() /
NANOS_PER_MILLIS;
+ auto remain_ms = _rpc_timeout_ms - _timeout_watch.elapsed_time() /
NANOS_PER_MILLIS;
if (UNLIKELY(remain_ms < config::min_load_rpc_timeout_ms)) {
remain_ms = config::min_load_rpc_timeout_ms;
}
@@ -1706,7 +1707,7 @@ void VTabletWriter::_generate_one_index_channel_payload(
size_t row_cnt = row_ids.size();
- for (int i = 0; i < row_ids.size(); i++) {
+ for (size_t i = 0; i < row_ids.size(); i++) {
// (tablet_id, VNodeChannel) where this tablet locate
auto it =
_channels[index_idx]->_channels_by_tablet.find(tablet_ids[i]);
DCHECK(it != _channels[index_idx]->_channels_by_tablet.end())
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
index cd196a8f2b3..f3ad6b1d5e3 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
@@ -55,6 +55,7 @@
#include "vec/sink/vtablet_finder.h"
namespace doris::vectorized {
+#include "common/compile_check_begin.h"
VTabletWriterV2::VTabletWriterV2(const TDataSink& t_sink, const
VExprContextSPtrs& output_exprs,
std::shared_ptr<pipeline::Dependency> dep,
@@ -359,7 +360,7 @@ void
VTabletWriterV2::_generate_rows_for_tablet(std::vector<RowPartTabletIds>& r
auto& partition_ids = row_part_tablet_ids[index_idx].partition_ids;
auto& tablet_ids = row_part_tablet_ids[index_idx].tablet_ids;
- for (int i = 0; i < row_ids.size(); i++) {
+ for (size_t i = 0; i < row_ids.size(); i++) {
auto& tablet_id = tablet_ids[i];
auto it = rows_for_tablet.find(tablet_id);
if (it == rows_for_tablet.end()) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]