This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 28e088aee1 [optimization](be) optimization for ColumnConst when
writing mysql result (#19122)
28e088aee1 is described below
commit 28e088aee13ddb6319701fa1480b1aef8e468525
Author: zclllyybb <[email protected]>
AuthorDate: Thu May 11 01:04:18 2023 +0800
[optimization](be) optimization for ColumnConst when writing mysql result
(#19122)
* opt for result
* fix
---
be/src/vec/sink/vmysql_result_writer.cpp | 203 +++++++++++++++++--------------
be/src/vec/sink/vmysql_result_writer.h | 12 +-
2 files changed, 115 insertions(+), 100 deletions(-)
diff --git a/be/src/vec/sink/vmysql_result_writer.cpp
b/be/src/vec/sink/vmysql_result_writer.cpp
index 8896c48e86..ce2498009e 100644
--- a/be/src/vec/sink/vmysql_result_writer.cpp
+++ b/be/src/vec/sink/vmysql_result_writer.cpp
@@ -17,6 +17,7 @@
#include "vec/sink/vmysql_result_writer.h"
+#include <fmt/core.h>
#include <gen_cpp/Data_types.h>
#include <gen_cpp/Metrics_types.h>
#include <glog/logging.h>
@@ -47,6 +48,7 @@
#include "vec/columns/column.h"
#include "vec/columns/column_array.h"
#include "vec/columns/column_complex.h"
+#include "vec/columns/column_const.h"
#include "vec/columns/column_decimal.h"
#include "vec/columns/column_nullable.h"
#include "vec/columns/column_struct.h"
@@ -103,15 +105,12 @@ template <bool is_binary_format>
template <PrimitiveType type, bool is_nullable>
Status VMysqlResultWriter<is_binary_format>::_add_one_column(
const ColumnPtr& column_ptr, std::unique_ptr<TFetchDataResult>& result,
- std::vector<MysqlRowBuffer<is_binary_format>>& rows_buffer, int scale,
+ std::vector<MysqlRowBuffer<is_binary_format>>& rows_buffer, bool
arg_const, int scale,
const DataTypes& sub_types) {
SCOPED_TIMER(_convert_tuple_timer);
- const auto row_size = column_ptr->size();
- if (rows_buffer.size() != row_size) {
- return Status::Error<ErrorCode::INTERNAL_ERROR>("row_size({}) !=
rows_buffer.size({})",
- row_size,
rows_buffer.size());
- }
+ //if arg_const is true, the column_ptr is already expanded to one row
+ const auto row_size = rows_buffer.size();
doris::vectorized::ColumnPtr column;
if constexpr (is_nullable) {
@@ -129,8 +128,10 @@ Status
VMysqlResultWriter<is_binary_format>::_add_one_column(
return Status::InternalError("pack mysql buffer failed.");
}
+ const auto col_index = index_check_const(i, arg_const);
+
if constexpr (is_nullable) {
- if (column_ptr->is_null_at(i)) {
+ if (column_ptr->is_null_at(col_index)) {
buf_ret = rows_buffer[i].push_null();
continue;
}
@@ -141,7 +142,7 @@ Status
VMysqlResultWriter<is_binary_format>::_add_one_column(
const vectorized::ColumnComplexType<BitmapValue>*
pColumnComplexType =
assert_cast<const
vectorized::ColumnComplexType<BitmapValue>*>(
column.get());
- BitmapValue bitmapValue =
pColumnComplexType->get_element(i);
+ BitmapValue bitmapValue =
pColumnComplexType->get_element(col_index);
size_t size = bitmapValue.getSizeInBytes();
std::unique_ptr<char[]> buf =
std::make_unique<char[]>(size);
bitmapValue.write_to(buf.get());
@@ -150,7 +151,7 @@ Status
VMysqlResultWriter<is_binary_format>::_add_one_column(
const vectorized::ColumnComplexType<HyperLogLog>*
pColumnComplexType =
assert_cast<const
vectorized::ColumnComplexType<HyperLogLog>*>(
column.get());
- HyperLogLog hyperLogLog =
pColumnComplexType->get_element(i);
+ HyperLogLog hyperLogLog =
pColumnComplexType->get_element(col_index);
size_t size = hyperLogLog.max_serialized_size();
std::unique_ptr<char[]> buf =
std::make_unique<char[]>(size);
hyperLogLog.serialize((uint8*)buf.get());
@@ -160,7 +161,7 @@ Status
VMysqlResultWriter<is_binary_format>::_add_one_column(
const vectorized::ColumnComplexType<QuantileStateDouble>*
pColumnComplexType =
assert_cast<const
vectorized::ColumnComplexType<QuantileStateDouble>*>(
column.get());
- QuantileStateDouble quantileValue =
pColumnComplexType->get_element(i);
+ QuantileStateDouble quantileValue =
pColumnComplexType->get_element(col_index);
size_t size = quantileValue.get_serialized_size();
std::unique_ptr<char[]> buf =
std::make_unique<char[]>(size);
quantileValue.serialize((uint8_t*)buf.get());
@@ -170,7 +171,7 @@ Status
VMysqlResultWriter<is_binary_format>::_add_one_column(
}
}
if constexpr (type == TYPE_VARCHAR) {
- const auto string_val = column->get_data_at(i);
+ const auto string_val = column->get_data_at(col_index);
if (string_val.data == nullptr) {
if (string_val.size == 0) {
@@ -185,7 +186,7 @@ Status
VMysqlResultWriter<is_binary_format>::_add_one_column(
}
}
if constexpr (type == TYPE_JSONB) {
- const auto jsonb_val = column->get_data_at(i);
+ const auto jsonb_val = column->get_data_at(col_index);
// jsonb size == 0 is NULL
if (jsonb_val.data == nullptr || jsonb_val.size == 0) {
buf_ret = rows_buffer[i].push_null();
@@ -205,8 +206,10 @@ Status
VMysqlResultWriter<is_binary_format>::_add_one_column(
return Status::InternalError("pack mysql buffer failed.");
}
+ const auto col_index = index_check_const(i, arg_const);
+
if constexpr (is_nullable) {
- if (column_ptr->is_null_at(i)) {
+ if (column_ptr->is_null_at(col_index)) {
buf_ret = rows_buffer[i].push_null();
continue;
}
@@ -215,7 +218,7 @@ Status
VMysqlResultWriter<is_binary_format>::_add_one_column(
rows_buffer[i].open_dynamic_mode();
buf_ret = rows_buffer[i].push_string("[", 1);
bool begin = true;
- for (auto j = offsets[i - 1]; j < offsets[i]; ++j) {
+ for (auto j = offsets[col_index - 1]; j < offsets[col_index]; ++j)
{
if (!begin) {
buf_ret = rows_buffer[i].push_string(", ", 2);
}
@@ -244,14 +247,16 @@ Status
VMysqlResultWriter<is_binary_format>::_add_one_column(
return Status::InternalError("pack mysql buffer failed.");
}
+ const auto col_index = index_check_const(i, arg_const);
+
if constexpr (is_nullable) {
- if (column_ptr->is_null_at(i)) {
+ if (column_ptr->is_null_at(col_index)) {
buf_ret = rows_buffer[i].push_null();
continue;
}
}
rows_buffer[i].open_dynamic_mode();
- std::string cell_str = map_type.to_string(*column, i);
+ std::string cell_str = map_type.to_string(*column, col_index);
buf_ret = rows_buffer[i].push_string(cell_str.c_str(),
strlen(cell_str.c_str()));
rows_buffer[i].close_dynamic_mode();
@@ -264,8 +269,10 @@ Status
VMysqlResultWriter<is_binary_format>::_add_one_column(
return Status::InternalError("pack mysql buffer failed.");
}
+ const auto col_index = index_check_const(i, arg_const);
+
if constexpr (is_nullable) {
- if (column_ptr->is_null_at(i)) {
+ if (column_ptr->is_null_at(col_index)) {
buf_ret = rows_buffer[i].push_null();
continue;
}
@@ -279,15 +286,15 @@ Status
VMysqlResultWriter<is_binary_format>::_add_one_column(
buf_ret = rows_buffer[i].push_string(", ", 2);
}
const auto& data = column_struct.get_column_ptr(j);
- if (data->is_null_at(i)) {
+ if (data->is_null_at(col_index)) {
buf_ret = rows_buffer[i].push_string("NULL",
strlen("NULL"));
} else {
if
(WhichDataType(remove_nullable(sub_types[j])).is_string()) {
buf_ret = rows_buffer[i].push_string("'", 1);
- buf_ret = _add_one_cell(data, i, sub_types[j],
rows_buffer[i]);
+ buf_ret = _add_one_cell(data, col_index, sub_types[j],
rows_buffer[i]);
buf_ret = rows_buffer[i].push_string("'", 1);
} else {
- buf_ret = _add_one_cell(data, i, sub_types[j],
rows_buffer[i]);
+ buf_ret = _add_one_cell(data, col_index, sub_types[j],
rows_buffer[i]);
}
}
begin = false;
@@ -303,13 +310,15 @@ Status
VMysqlResultWriter<is_binary_format>::_add_one_column(
return Status::InternalError("pack mysql buffer failed.");
}
+ const auto col_index = index_check_const(i, arg_const);
+
if constexpr (is_nullable) {
- if (column_ptr->is_null_at(i)) {
+ if (column_ptr->is_null_at(col_index)) {
buf_ret = rows_buffer[i].push_null();
continue;
}
}
- std::string decimal_str = sub_types[0]->to_string(*column, i);
+ std::string decimal_str = sub_types[0]->to_string(*column,
col_index);
buf_ret = rows_buffer[i].push_string(decimal_str.c_str(),
decimal_str.length());
}
} else {
@@ -321,8 +330,10 @@ Status
VMysqlResultWriter<is_binary_format>::_add_one_column(
return Status::InternalError("pack mysql buffer failed.");
}
+ const auto col_index = index_check_const(i, arg_const);
+
if constexpr (is_nullable) {
- if (column_ptr->is_null_at(i)) {
+ if (column_ptr->is_null_at(col_index)) {
buf_ret = rows_buffer[i].push_null();
continue;
}
@@ -330,46 +341,46 @@ Status
VMysqlResultWriter<is_binary_format>::_add_one_column(
if constexpr (type == TYPE_BOOLEAN) {
//todo here need to using uint after MysqlRowBuffer support it
- buf_ret = rows_buffer[i].push_tinyint(data[i]);
+ buf_ret = rows_buffer[i].push_tinyint(data[col_index]);
}
if constexpr (type == TYPE_TINYINT) {
- buf_ret = rows_buffer[i].push_tinyint(data[i]);
+ buf_ret = rows_buffer[i].push_tinyint(data[col_index]);
}
if constexpr (type == TYPE_SMALLINT) {
- buf_ret = rows_buffer[i].push_smallint(data[i]);
+ buf_ret = rows_buffer[i].push_smallint(data[col_index]);
}
if constexpr (type == TYPE_INT) {
- buf_ret = rows_buffer[i].push_int(data[i]);
+ buf_ret = rows_buffer[i].push_int(data[col_index]);
}
if constexpr (type == TYPE_BIGINT) {
- buf_ret = rows_buffer[i].push_bigint(data[i]);
+ buf_ret = rows_buffer[i].push_bigint(data[col_index]);
}
if constexpr (type == TYPE_LARGEINT) {
- auto v = LargeIntValue::to_string(data[i]);
+ auto v = LargeIntValue::to_string(data[col_index]);
buf_ret = rows_buffer[i].push_string(v.c_str(), v.size());
}
if constexpr (type == TYPE_FLOAT) {
- buf_ret = rows_buffer[i].push_float(data[i]);
+ buf_ret = rows_buffer[i].push_float(data[col_index]);
}
if constexpr (type == TYPE_DOUBLE) {
- buf_ret = rows_buffer[i].push_double(data[i]);
+ buf_ret = rows_buffer[i].push_double(data[col_index]);
}
if constexpr (type == TYPE_TIME || type == TYPE_TIMEV2) {
- buf_ret = rows_buffer[i].push_time(data[i]);
+ buf_ret = rows_buffer[i].push_time(data[col_index]);
}
if constexpr (type == TYPE_DATETIME) {
- auto time_num = data[i];
+ auto time_num = data[col_index];
VecDateTimeValue time_val = binary_cast<Int64,
VecDateTimeValue>(time_num);
buf_ret = rows_buffer[i].push_vec_datetime(time_val);
}
if constexpr (type == TYPE_DATEV2) {
- auto time_num = data[i];
+ auto time_num = data[col_index];
DateV2Value<DateV2ValueType> date_val =
binary_cast<UInt32,
DateV2Value<DateV2ValueType>>(time_num);
buf_ret = rows_buffer[i].push_vec_datetime(date_val);
}
if constexpr (type == TYPE_DATETIMEV2) {
- auto time_num = data[i];
+ auto time_num = data[col_index];
char buf[64];
DateV2Value<DateTimeV2ValueType> date_val =
binary_cast<UInt64,
DateV2Value<DateTimeV2ValueType>>(time_num);
@@ -377,7 +388,7 @@ Status
VMysqlResultWriter<is_binary_format>::_add_one_column(
buf_ret = rows_buffer[i].push_string(buf, pos - buf - 1);
}
if constexpr (type == TYPE_DECIMALV2) {
- DecimalV2Value decimal_val(data[i]);
+ DecimalV2Value decimal_val(data[col_index]);
auto decimal_str = decimal_val.to_string(scale);
buf_ret = rows_buffer[i].push_string(decimal_str.c_str(),
decimal_str.length());
}
@@ -604,107 +615,111 @@ Status
VMysqlResultWriter<is_binary_format>::append_block(Block& input_block) {
// convert one batch
auto result = std::make_unique<TFetchDataResult>();
for (int i = 0; status.ok() && i < _output_vexpr_ctxs.size(); ++i) {
- auto column_ptr =
block.get_by_position(i).column->convert_to_full_column_if_const();
+ const auto& [column_ptr, col_const] =
unpack_if_const(block.get_by_position(i).column);
auto type_ptr = block.get_by_position(i).type;
+ DCHECK(num_rows == block.get_by_position(i).column->size())
+ << fmt::format("block's rows({}) != column{}'s size({})",
num_rows, i,
+ block.get_by_position(i).column->size());
+
int scale = _output_vexpr_ctxs[i]->root()->type().scale;
switch (_output_vexpr_ctxs[i]->root()->result_type()) {
case TYPE_BOOLEAN:
if (type_ptr->is_nullable()) {
status = _add_one_column<PrimitiveType::TYPE_BOOLEAN,
true>(column_ptr, result,
-
rows_buffer);
+
rows_buffer, col_const);
} else {
- status = _add_one_column<PrimitiveType::TYPE_BOOLEAN,
false>(column_ptr, result,
-
rows_buffer);
+ status = _add_one_column<PrimitiveType::TYPE_BOOLEAN, false>(
+ column_ptr, result, rows_buffer, col_const);
}
break;
case TYPE_TINYINT: {
if (type_ptr->is_nullable()) {
status = _add_one_column<PrimitiveType::TYPE_TINYINT,
true>(column_ptr, result,
-
rows_buffer);
+
rows_buffer, col_const);
} else {
- status = _add_one_column<PrimitiveType::TYPE_TINYINT,
false>(column_ptr, result,
-
rows_buffer);
+ status = _add_one_column<PrimitiveType::TYPE_TINYINT, false>(
+ column_ptr, result, rows_buffer, col_const);
}
break;
}
case TYPE_SMALLINT: {
if (type_ptr->is_nullable()) {
- status = _add_one_column<PrimitiveType::TYPE_SMALLINT,
true>(column_ptr, result,
-
rows_buffer);
+ status = _add_one_column<PrimitiveType::TYPE_SMALLINT, true>(
+ column_ptr, result, rows_buffer, col_const);
} else {
- status = _add_one_column<PrimitiveType::TYPE_SMALLINT,
false>(column_ptr, result,
-
rows_buffer);
+ status = _add_one_column<PrimitiveType::TYPE_SMALLINT, false>(
+ column_ptr, result, rows_buffer, col_const);
}
break;
}
case TYPE_INT: {
if (type_ptr->is_nullable()) {
status = _add_one_column<PrimitiveType::TYPE_INT,
true>(column_ptr, result,
-
rows_buffer);
+
rows_buffer, col_const);
} else {
status = _add_one_column<PrimitiveType::TYPE_INT,
false>(column_ptr, result,
-
rows_buffer);
+
rows_buffer, col_const);
}
break;
}
case TYPE_BIGINT: {
if (type_ptr->is_nullable()) {
status = _add_one_column<PrimitiveType::TYPE_BIGINT,
true>(column_ptr, result,
-
rows_buffer);
+
rows_buffer, col_const);
} else {
status = _add_one_column<PrimitiveType::TYPE_BIGINT,
false>(column_ptr, result,
-
rows_buffer);
+
rows_buffer, col_const);
}
break;
}
case TYPE_LARGEINT: {
if (type_ptr->is_nullable()) {
- status = _add_one_column<PrimitiveType::TYPE_LARGEINT,
true>(column_ptr, result,
-
rows_buffer);
+ status = _add_one_column<PrimitiveType::TYPE_LARGEINT, true>(
+ column_ptr, result, rows_buffer, col_const);
} else {
- status = _add_one_column<PrimitiveType::TYPE_LARGEINT,
false>(column_ptr, result,
-
rows_buffer);
+ status = _add_one_column<PrimitiveType::TYPE_LARGEINT, false>(
+ column_ptr, result, rows_buffer, col_const);
}
break;
}
case TYPE_FLOAT: {
if (type_ptr->is_nullable()) {
status = _add_one_column<PrimitiveType::TYPE_FLOAT,
true>(column_ptr, result,
-
rows_buffer);
+
rows_buffer, col_const);
} else {
status = _add_one_column<PrimitiveType::TYPE_FLOAT,
false>(column_ptr, result,
-
rows_buffer);
+
rows_buffer, col_const);
}
break;
}
case TYPE_DOUBLE: {
if (type_ptr->is_nullable()) {
status = _add_one_column<PrimitiveType::TYPE_DOUBLE,
true>(column_ptr, result,
-
rows_buffer);
+
rows_buffer, col_const);
} else {
status = _add_one_column<PrimitiveType::TYPE_DOUBLE,
false>(column_ptr, result,
-
rows_buffer);
+
rows_buffer, col_const);
}
break;
}
case TYPE_TIME: {
if (type_ptr->is_nullable()) {
status = _add_one_column<PrimitiveType::TYPE_TIME,
true>(column_ptr, result,
-
rows_buffer);
+
rows_buffer, col_const);
} else {
status = _add_one_column<PrimitiveType::TYPE_TIME,
false>(column_ptr, result,
-
rows_buffer);
+
rows_buffer, col_const);
}
break;
}
case TYPE_TIMEV2: {
if (type_ptr->is_nullable()) {
status = _add_one_column<PrimitiveType::TYPE_TIMEV2,
true>(column_ptr, result,
-
rows_buffer);
+
rows_buffer, col_const);
} else {
status = _add_one_column<PrimitiveType::TYPE_TIMEV2,
false>(column_ptr, result,
-
rows_buffer);
+
rows_buffer, col_const);
}
break;
}
@@ -714,10 +729,10 @@ Status
VMysqlResultWriter<is_binary_format>::append_block(Block& input_block) {
case TYPE_AGG_STATE: {
if (type_ptr->is_nullable()) {
status = _add_one_column<PrimitiveType::TYPE_VARCHAR,
true>(column_ptr, result,
-
rows_buffer);
+
rows_buffer, col_const);
} else {
- status = _add_one_column<PrimitiveType::TYPE_VARCHAR,
false>(column_ptr, result,
-
rows_buffer);
+ status = _add_one_column<PrimitiveType::TYPE_VARCHAR, false>(
+ column_ptr, result, rows_buffer, col_const);
}
break;
}
@@ -726,10 +741,10 @@ Status
VMysqlResultWriter<is_binary_format>::append_block(Block& input_block) {
auto& nested_type =
assert_cast<const
DataTypeNullable&>(*type_ptr).get_nested_type();
status = _add_one_column<PrimitiveType::TYPE_DECIMALV2, true>(
- column_ptr, result, rows_buffer, scale, {nested_type});
+ column_ptr, result, rows_buffer, col_const, scale,
{nested_type});
} else {
status = _add_one_column<PrimitiveType::TYPE_DECIMALV2, false>(
- column_ptr, result, rows_buffer, scale, {type_ptr});
+ column_ptr, result, rows_buffer, col_const, scale,
{type_ptr});
}
break;
}
@@ -738,10 +753,10 @@ Status
VMysqlResultWriter<is_binary_format>::append_block(Block& input_block) {
auto& nested_type =
assert_cast<const
DataTypeNullable&>(*type_ptr).get_nested_type();
status = _add_one_column<PrimitiveType::TYPE_DECIMAL32, true>(
- column_ptr, result, rows_buffer, scale, {nested_type});
+ column_ptr, result, rows_buffer, col_const, scale,
{nested_type});
} else {
status = _add_one_column<PrimitiveType::TYPE_DECIMAL32, false>(
- column_ptr, result, rows_buffer, scale, {type_ptr});
+ column_ptr, result, rows_buffer, col_const, scale,
{type_ptr});
}
break;
}
@@ -750,10 +765,10 @@ Status
VMysqlResultWriter<is_binary_format>::append_block(Block& input_block) {
auto& nested_type =
assert_cast<const
DataTypeNullable&>(*type_ptr).get_nested_type();
status = _add_one_column<PrimitiveType::TYPE_DECIMAL64, true>(
- column_ptr, result, rows_buffer, scale, {nested_type});
+ column_ptr, result, rows_buffer, col_const, scale,
{nested_type});
} else {
status = _add_one_column<PrimitiveType::TYPE_DECIMAL64, false>(
- column_ptr, result, rows_buffer, scale, {type_ptr});
+ column_ptr, result, rows_buffer, col_const, scale,
{type_ptr});
}
break;
}
@@ -762,51 +777,51 @@ Status
VMysqlResultWriter<is_binary_format>::append_block(Block& input_block) {
auto& nested_type =
assert_cast<const
DataTypeNullable&>(*type_ptr).get_nested_type();
status = _add_one_column<PrimitiveType::TYPE_DECIMAL128I,
true>(
- column_ptr, result, rows_buffer, scale, {nested_type});
+ column_ptr, result, rows_buffer, col_const, scale,
{nested_type});
} else {
status = _add_one_column<PrimitiveType::TYPE_DECIMAL128I,
false>(
- column_ptr, result, rows_buffer, scale, {type_ptr});
+ column_ptr, result, rows_buffer, col_const, scale,
{type_ptr});
}
break;
}
case TYPE_JSONB: {
if (type_ptr->is_nullable()) {
status = _add_one_column<PrimitiveType::TYPE_JSONB,
true>(column_ptr, result,
-
rows_buffer);
+
rows_buffer, col_const);
} else {
status = _add_one_column<PrimitiveType::TYPE_JSONB,
false>(column_ptr, result,
-
rows_buffer);
+
rows_buffer, col_const);
}
break;
}
case TYPE_DATE:
case TYPE_DATETIME: {
if (type_ptr->is_nullable()) {
- status = _add_one_column<PrimitiveType::TYPE_DATETIME,
true>(column_ptr, result,
-
rows_buffer);
+ status = _add_one_column<PrimitiveType::TYPE_DATETIME, true>(
+ column_ptr, result, rows_buffer, col_const);
} else {
- status = _add_one_column<PrimitiveType::TYPE_DATETIME,
false>(column_ptr, result,
-
rows_buffer);
+ status = _add_one_column<PrimitiveType::TYPE_DATETIME, false>(
+ column_ptr, result, rows_buffer, col_const);
}
break;
}
case TYPE_DATEV2: {
if (type_ptr->is_nullable()) {
status = _add_one_column<PrimitiveType::TYPE_DATEV2,
true>(column_ptr, result,
-
rows_buffer);
+
rows_buffer, col_const);
} else {
status = _add_one_column<PrimitiveType::TYPE_DATEV2,
false>(column_ptr, result,
-
rows_buffer);
+
rows_buffer, col_const);
}
break;
}
case TYPE_DATETIMEV2: {
if (type_ptr->is_nullable()) {
- status = _add_one_column<PrimitiveType::TYPE_DATETIMEV2,
true>(column_ptr, result,
-
rows_buffer, scale);
+ status = _add_one_column<PrimitiveType::TYPE_DATETIMEV2, true>(
+ column_ptr, result, rows_buffer, col_const, scale);
} else {
- status = _add_one_column<PrimitiveType::TYPE_DATETIMEV2,
false>(column_ptr, result,
-
rows_buffer, scale);
+ status = _add_one_column<PrimitiveType::TYPE_DATETIMEV2,
false>(
+ column_ptr, result, rows_buffer, col_const, scale);
}
break;
}
@@ -815,10 +830,10 @@ Status
VMysqlResultWriter<is_binary_format>::append_block(Block& input_block) {
case TYPE_OBJECT: {
if (type_ptr->is_nullable()) {
status = _add_one_column<PrimitiveType::TYPE_OBJECT,
true>(column_ptr, result,
-
rows_buffer);
+
rows_buffer, col_const);
} else {
status = _add_one_column<PrimitiveType::TYPE_OBJECT,
false>(column_ptr, result,
-
rows_buffer);
+
rows_buffer, col_const);
}
break;
}
@@ -831,11 +846,11 @@ Status
VMysqlResultWriter<is_binary_format>::append_block(Block& input_block) {
assert_cast<const
DataTypeNullable&>(*type_ptr).get_nested_type();
auto& sub_type = assert_cast<const
DataTypeArray&>(*nested_type).get_nested_type();
status = _add_one_column<PrimitiveType::TYPE_ARRAY, true>(
- column_ptr, result, rows_buffer, scale, {sub_type});
+ column_ptr, result, rows_buffer, col_const, scale,
{sub_type});
} else {
auto& sub_type = assert_cast<const
DataTypeArray&>(*type_ptr).get_nested_type();
status = _add_one_column<PrimitiveType::TYPE_ARRAY, false>(
- column_ptr, result, rows_buffer, scale, {sub_type});
+ column_ptr, result, rows_buffer, col_const, scale,
{sub_type});
}
break;
}
@@ -845,11 +860,11 @@ Status
VMysqlResultWriter<is_binary_format>::append_block(Block& input_block) {
assert_cast<const
DataTypeNullable&>(*type_ptr).get_nested_type();
auto& sub_types = assert_cast<const
DataTypeStruct&>(*nested_type).get_elements();
status = _add_one_column<PrimitiveType::TYPE_STRUCT, true>(
- column_ptr, result, rows_buffer, scale, sub_types);
+ column_ptr, result, rows_buffer, col_const, scale,
sub_types);
} else {
auto& sub_types = assert_cast<const
DataTypeStruct&>(*type_ptr).get_elements();
status = _add_one_column<PrimitiveType::TYPE_STRUCT, false>(
- column_ptr, result, rows_buffer, scale, sub_types);
+ column_ptr, result, rows_buffer, col_const, scale,
sub_types);
}
break;
}
@@ -858,10 +873,10 @@ Status
VMysqlResultWriter<is_binary_format>::append_block(Block& input_block) {
auto& nested_type =
assert_cast<const
DataTypeNullable&>(*type_ptr).get_nested_type(); //for map
status = _add_one_column<PrimitiveType::TYPE_MAP, true>(
- column_ptr, result, rows_buffer, scale, {nested_type});
+ column_ptr, result, rows_buffer, col_const, scale,
{nested_type});
} else {
status = _add_one_column<PrimitiveType::TYPE_MAP, false>(
- column_ptr, result, rows_buffer, scale, {type_ptr});
+ column_ptr, result, rows_buffer, col_const, scale,
{type_ptr});
}
break;
}
diff --git a/be/src/vec/sink/vmysql_result_writer.h
b/be/src/vec/sink/vmysql_result_writer.h
index 572f7313b2..19c2838c60 100644
--- a/be/src/vec/sink/vmysql_result_writer.h
+++ b/be/src/vec/sink/vmysql_result_writer.h
@@ -46,13 +46,13 @@ public:
const std::vector<vectorized::VExprContext*>&
output_vexpr_ctxs,
RuntimeProfile* parent_profile);
- virtual Status init(RuntimeState* state) override;
+ Status init(RuntimeState* state) override;
- virtual Status append_block(Block& block) override;
+ Status append_block(Block& block) override;
- virtual bool can_sink() override;
+ bool can_sink() override;
- virtual Status close() override;
+ Status close() override;
const ResultList& results() { return _results; }
@@ -62,11 +62,11 @@ private:
template <PrimitiveType type, bool is_nullable>
Status _add_one_column(const ColumnPtr& column_ptr,
std::unique_ptr<TFetchDataResult>& result,
std::vector<MysqlRowBuffer<is_binary_format>>&
rows_buffer,
- int scale = -1, const DataTypes& sub_types =
DataTypes());
+ bool arg_const, int scale = -1,
+ const DataTypes& sub_types = DataTypes());
int _add_one_cell(const ColumnPtr& column_ptr, size_t row_idx, const
DataTypePtr& type,
MysqlRowBuffer<is_binary_format>& buffer, int scale =
-1);
-private:
BufferControlBlock* _sinker;
const std::vector<vectorized::VExprContext*>& _output_vexpr_ctxs;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]