This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 1d1b2f98c3 [refactor](function) let agg functions exception safety
(#19109)
1d1b2f98c3 is described below
commit 1d1b2f98c3ed4a38a2c03b5916806a953187050e
Author: zhangstar333 <[email protected]>
AuthorDate: Thu May 11 10:17:11 2023 +0800
[refactor](function) let agg functions exception safety (#19109)
---
be/src/util/tdigest.h | 2 +
.../aggregate_function_bitmap.cpp | 11 +-
.../aggregate_function_java_udaf.h | 114 +++++++++++++--------
.../aggregate_function_min_max.h | 18 ++--
.../aggregate_function_min_max_by.h | 10 +-
.../aggregate_function_percentile_approx.h | 8 +-
.../aggregate_function_reader_first_last.h | 16 +--
.../aggregate_function_window.cpp | 12 +--
8 files changed, 113 insertions(+), 78 deletions(-)
diff --git a/be/src/util/tdigest.h b/be/src/util/tdigest.h
index 079c0996ba..0a8168fe8e 100644
--- a/be/src/util/tdigest.h
+++ b/be/src/util/tdigest.h
@@ -51,6 +51,7 @@
#include <utility>
#include <vector>
+#include "common/factory_creator.h"
#include "common/logging.h"
#include "udf/udf.h"
#include "util/debug_util.h"
@@ -119,6 +120,7 @@ struct CentroidComparator {
};
class TDigest {
+ ENABLE_FACTORY_CREATOR(TDigest);
struct TDigestRadixSortTraits {
using Element = Centroid;
using Key = Value;
diff --git a/be/src/vec/aggregate_functions/aggregate_function_bitmap.cpp
b/be/src/vec/aggregate_functions/aggregate_function_bitmap.cpp
index 35e4636dfa..0676fd5bc2 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_bitmap.cpp
+++ b/be/src/vec/aggregate_functions/aggregate_function_bitmap.cpp
@@ -25,15 +25,18 @@
namespace doris::vectorized {
template <bool nullable, template <bool, typename> class
AggregateFunctionTemplate>
-IAggregateFunction* create_with_int_data_type(const DataTypes& argument_type) {
+AggregateFunctionPtr create_with_int_data_type(const DataTypes& argument_type)
{
auto type = remove_nullable(argument_type[0]);
WhichDataType which(type);
-#define DISPATCH(TYPE)
\
- if (which.idx == TypeIndex::TYPE) {
\
- return new AggregateFunctionTemplate<nullable,
ColumnVector<TYPE>>(argument_type); \
+#define DISPATCH(TYPE)
\
+ if (which.idx == TypeIndex::TYPE) {
\
+ return std::make_shared<AggregateFunctionTemplate<nullable,
ColumnVector<TYPE>>>( \
+ argument_type);
\
}
FOR_INTEGER_TYPES(DISPATCH)
#undef DISPATCH
+ LOG(WARNING) << "with unknowed type, failed in create_with_int_data_type
bitmap_union_int"
+ << " and type is: " << argument_type[0]->get_name();
return nullptr;
}
diff --git a/be/src/vec/aggregate_functions/aggregate_function_java_udaf.h
b/be/src/vec/aggregate_functions/aggregate_function_java_udaf.h
index 4ab11049d3..d913c8b32d 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_java_udaf.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_java_udaf.h
@@ -23,6 +23,8 @@
#include <cstdint>
#include <memory>
+#include "common/compiler_util.h"
+#include "common/exception.h"
#include "common/status.h"
#include "gutil/strings/substitute.h"
#include "runtime/user_function_cache.h"
@@ -53,18 +55,18 @@ public:
AggregateJavaUdafData() = default;
AggregateJavaUdafData(int64_t num_args) {
argument_size = num_args;
- input_values_buffer_ptr.reset(new int64_t[num_args]);
- input_nulls_buffer_ptr.reset(new int64_t[num_args]);
- input_offsets_ptrs.reset(new int64_t[num_args]);
- input_array_nulls_buffer_ptr.reset(new int64_t[num_args]);
- input_array_string_offsets_ptrs.reset(new int64_t[num_args]);
- input_place_ptrs.reset(new int64_t);
- output_value_buffer.reset(new int64_t);
- output_null_value.reset(new int64_t);
- output_offsets_ptr.reset(new int64_t);
- output_intermediate_state_ptr.reset(new int64_t);
- output_array_null_ptr.reset(new int64_t);
- output_array_string_offsets_ptr.reset(new int64_t);
+ input_values_buffer_ptr = std::make_unique<int64_t[]>(num_args);
+ input_nulls_buffer_ptr = std::make_unique<int64_t[]>(num_args);
+ input_offsets_ptrs = std::make_unique<int64_t[]>(num_args);
+ input_array_nulls_buffer_ptr = std::make_unique<int64_t[]>(num_args);
+ input_array_string_offsets_ptrs =
std::make_unique<int64_t[]>(num_args);
+ input_place_ptrs = std::make_unique<int64_t>(0);
+ output_value_buffer = std::make_unique<int64_t>(0);
+ output_null_value = std::make_unique<int64_t>(0);
+ output_offsets_ptr = std::make_unique<int64_t>(0);
+ output_intermediate_state_ptr = std::make_unique<int64_t>(0);
+ output_array_null_ptr = std::make_unique<int64_t>(0);
+ output_array_string_offsets_ptr = std::make_unique<int64_t>(0);
}
~AggregateJavaUdafData() {
@@ -206,6 +208,7 @@ public:
// save it in BE, Because i'm not sure there is a way to use the
byte[] not allocate again.
jbyteArray arr = (jbyteArray)(env->CallNonvirtualObjectMethod(
executor_obj, executor_cl, executor_serialize_id, place));
+ RETURN_IF_ERROR(JniUtil::GetJniExceptionMsg(env));
int len = env->GetArrayLength(arr);
serialize_data.resize(len);
env->GetByteArrayRegion(arr, 0, len,
reinterpret_cast<jbyte*>(serialize_data.data()));
@@ -251,17 +254,17 @@ public:
jboolean res = env->CallNonvirtualBooleanMethod(executor_obj,
executor_cl, \
executor_result_id,
to.size() - 1, place); \
while (res != JNI_TRUE) {
\
- /*Add this check is now, the agg function can't deal with the
return status, */ \
- /*even we return a bad status, nobody could deal with it,*/
\
- /*so add this limit avoid std::bad_alloc, (1024<<10) is enough*/
\
- /*but this maybe get a mistake of result,when could handle
exception need removethis*/ \
- if (increase_buffer_size == 10) {
\
- return Status::MemoryAllocFailed("memory allocate failed,
buffer:{},size:{}", \
- increase_buffer_size,
buffer_size); \
- }
\
+ RETURN_IF_ERROR(JniUtil::GetJniExceptionMsg(env));
\
increase_buffer_size++;
\
buffer_size =
JniUtil::IncreaseReservedBufferSize(increase_buffer_size); \
- chars.resize(buffer_size);
\
+ try {
\
+ chars.resize(buffer_size);
\
+ } catch (std::bad_alloc const& e) {
\
+ throw doris::Exception(
\
+ ErrorCode::INTERNAL_ERROR,
\
+ "memory allocate failed in column string,
buffer:{},size:{}", \
+ increase_buffer_size, buffer_size);
\
+ }
\
*output_value_buffer = reinterpret_cast<int64_t>(chars.data());
\
*output_intermediate_state_ptr = chars.size();
\
res = env->CallNonvirtualBooleanMethod(executor_obj, executor_cl,
executor_result_id, \
@@ -298,15 +301,19 @@ public:
jboolean res = env->CallNonvirtualBooleanMethod(
\
executor_obj, executor_cl, executor_result_id, to.size() -
1, place); \
while (res != JNI_TRUE) {
\
- if (increase_buffer_size == 10) {
\
- return Status::MemoryAllocFailed("memory allocate failed,
buffer:{},size:{}", \
- increase_buffer_size,
buffer_size); \
- }
\
+ RETURN_IF_ERROR(JniUtil::GetJniExceptionMsg(env));
\
increase_buffer_size++;
\
buffer_size =
JniUtil::IncreaseReservedBufferSize(increase_buffer_size); \
- null_map_data.resize(buffer_size);
\
- chars.resize(buffer_size);
\
- offsets.resize(buffer_size);
\
+ try {
\
+ null_map_data.resize(buffer_size);
\
+ chars.resize(buffer_size);
\
+ offsets.resize(buffer_size);
\
+ } catch (std::bad_alloc const& e) {
\
+ throw doris::Exception(
\
+ ErrorCode::INTERNAL_ERROR,
\
+ "memory allocate failed in array column string,
buffer:{},size:{}", \
+ increase_buffer_size, buffer_size);
\
+ }
\
*output_array_null_ptr =
reinterpret_cast<int64_t>(null_map_data.data()); \
*output_value_buffer =
reinterpret_cast<int64_t>(chars.data()); \
*output_array_string_offsets_ptr =
reinterpret_cast<int64_t>(offsets.data()); \
@@ -320,14 +327,18 @@ public:
jboolean res = env->CallNonvirtualBooleanMethod(
\
executor_obj, executor_cl, executor_result_id, to.size() -
1, place); \
while (res != JNI_TRUE) {
\
- if (increase_buffer_size == 10) {
\
- return Status::MemoryAllocFailed("memory allocate failed,
buffer:{},size:{}", \
- increase_buffer_size,
buffer_size); \
- }
\
+ RETURN_IF_ERROR(JniUtil::GetJniExceptionMsg(env));
\
increase_buffer_size++;
\
buffer_size =
JniUtil::IncreaseReservedBufferSize(increase_buffer_size); \
- null_map_data.resize(buffer_size);
\
- data_column->resize(buffer_size);
\
+ try {
\
+ null_map_data.resize(buffer_size);
\
+ data_column->resize(buffer_size);
\
+ } catch (std::bad_alloc const& e) {
\
+ throw doris::Exception(
\
+ ErrorCode::INTERNAL_ERROR,
\
+ "memory allocate failed in array number column,
buffer:{},size:{}", \
+ increase_buffer_size, buffer_size);
\
+ }
\
*output_array_null_ptr =
reinterpret_cast<int64_t>(null_map_data.data()); \
*output_value_buffer =
\
reinterpret_cast<int64_t>(data_column->get_raw_data().data); \
@@ -409,6 +420,7 @@ private:
class AggregateJavaUdaf final
: public IAggregateFunctionDataHelper<AggregateJavaUdafData,
AggregateJavaUdaf> {
public:
+ ENABLE_FACTORY_CREATOR(AggregateJavaUdaf);
AggregateJavaUdaf(const TFunction& fn, const DataTypes& argument_types,
const DataTypePtr& return_type)
: IAggregateFunctionDataHelper(argument_types),
@@ -416,7 +428,7 @@ public:
_return_type(return_type),
_first_created(true),
_exec_place(nullptr) {}
- ~AggregateJavaUdaf() = default;
+ ~AggregateJavaUdaf() override = default;
static AggregateFunctionPtr create(const TFunction& fn, const DataTypes&
argument_types,
const DataTypePtr& return_type) {
@@ -461,6 +473,7 @@ public:
Arena*) const override {
LOG(WARNING) << " shouldn't going add function, there maybe some error
about function "
<< _fn.name.function_name;
+ throw doris::Exception(ErrorCode::INTERNAL_ERROR, "shouldn't going add
function");
}
void add_batch(size_t batch_size, AggregateDataPtr* places, size_t
place_offset,
@@ -469,7 +482,11 @@ public:
for (size_t i = 0; i < batch_size; ++i) {
places_address[i] = reinterpret_cast<int64_t>(places[i] +
place_offset);
}
- this->data(_exec_place).add(places_address, false, columns, 0,
batch_size, argument_types);
+ Status st = this->data(_exec_place)
+ .add(places_address, false, columns, 0,
batch_size, argument_types);
+ if (UNLIKELY(st != Status::OK())) {
+ throw doris::Exception(ErrorCode::INTERNAL_ERROR, st.to_string());
+ }
}
// TODO: Here we calling method by jni, And if we get a thrown from FE,
@@ -478,23 +495,35 @@ public:
Arena* /*arena*/) const override {
int64_t places_address[1];
places_address[0] = reinterpret_cast<int64_t>(place);
- this->data(_exec_place).add(places_address, true, columns, 0,
batch_size, argument_types);
+ Status st = this->data(_exec_place)
+ .add(places_address, true, columns, 0, batch_size,
argument_types);
+ if (UNLIKELY(st != Status::OK())) {
+ throw doris::Exception(ErrorCode::INTERNAL_ERROR, st.to_string());
+ }
}
// TODO: reset function should be implement also in struct data
void reset(AggregateDataPtr /*place*/) const override {
LOG(WARNING) << " shouldn't going reset function, there maybe some
error about function "
<< _fn.name.function_name;
+ throw doris::Exception(ErrorCode::INTERNAL_ERROR, "shouldn't going
reset function");
}
void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs,
Arena*) const override {
- this->data(_exec_place).merge(this->data(rhs),
reinterpret_cast<int64_t>(place));
+ Status st =
+ this->data(_exec_place).merge(this->data(rhs),
reinterpret_cast<int64_t>(place));
+ if (UNLIKELY(st != Status::OK())) {
+ throw doris::Exception(ErrorCode::INTERNAL_ERROR, st.to_string());
+ }
}
void serialize(ConstAggregateDataPtr __restrict place, BufferWritable&
buf) const override {
- this->data(const_cast<AggregateDataPtr&>(_exec_place))
- .write(buf, reinterpret_cast<int64_t>(place));
+ Status st = this->data(const_cast<AggregateDataPtr&>(_exec_place))
+ .write(buf, reinterpret_cast<int64_t>(place));
+ if (UNLIKELY(st != Status::OK())) {
+ throw doris::Exception(ErrorCode::INTERNAL_ERROR, st.to_string());
+ }
}
// during merge-finalized phase, for deserialize and merge firstly,
@@ -509,7 +538,10 @@ public:
}
void insert_result_into(ConstAggregateDataPtr __restrict place, IColumn&
to) const override {
- this->data(_exec_place).get(to, _return_type,
reinterpret_cast<int64_t>(place));
+ Status st = this->data(_exec_place).get(to, _return_type,
reinterpret_cast<int64_t>(place));
+ if (UNLIKELY(st != Status::OK())) {
+ throw doris::Exception(ErrorCode::INTERNAL_ERROR, st.to_string());
+ }
}
private:
diff --git a/be/src/vec/aggregate_functions/aggregate_function_min_max.h
b/be/src/vec/aggregate_functions/aggregate_function_min_max.h
index 4d8a2ae598..2d1f7b7c00 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_min_max.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_min_max.h
@@ -91,7 +91,7 @@ public:
}
}
- void read(BufferReadable& buf) {
+ void read(BufferReadable& buf, Arena* arena) {
read_binary(has_value, buf);
if (has()) {
read_binary(value, buf);
@@ -209,7 +209,7 @@ public:
}
}
- void read(BufferReadable& buf) {
+ void read(BufferReadable& buf, Arena* arena) {
read_binary(has_value, buf);
if (has()) {
read_binary(value, buf);
@@ -308,7 +308,7 @@ private:
char small_data[MAX_SMALL_STRING_SIZE]; /// Including the terminating zero.
public:
- ~SingleValueDataString() { delete[] large_data; }
+ ~SingleValueDataString() = default;
constexpr static bool IsFixedLength = false;
@@ -340,7 +340,7 @@ public:
}
}
- void read(BufferReadable& buf) {
+ void read(BufferReadable& buf, Arena* arena) {
Int32 rhs_size;
read_binary(rhs_size, buf);
@@ -356,8 +356,7 @@ public:
} else {
if (capacity < rhs_size) {
capacity =
static_cast<UInt32>(round_up_to_power_of_two_or_zero(rhs_size));
- delete[] large_data;
- large_data = new char[capacity];
+ large_data = arena->alloc(capacity);
}
size = rhs_size;
@@ -386,8 +385,7 @@ public:
if (capacity < value_size) {
/// Don't free large_data here.
capacity = round_up_to_power_of_two_or_zero(value_size);
- delete[] large_data;
- large_data = new char[capacity];
+ large_data = arena->alloc(capacity);
}
size = value_size;
@@ -546,8 +544,8 @@ public:
}
void deserialize(AggregateDataPtr __restrict place, BufferReadable& buf,
- Arena*) const override {
- this->data(place).read(buf);
+ Arena* arena) const override {
+ this->data(place).read(buf, arena);
}
void insert_result_into(ConstAggregateDataPtr __restrict place, IColumn&
to) const override {
diff --git a/be/src/vec/aggregate_functions/aggregate_function_min_max_by.h
b/be/src/vec/aggregate_functions/aggregate_function_min_max_by.h
index 69945facfc..6fa82fa2a1 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_min_max_by.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_min_max_by.h
@@ -45,9 +45,9 @@ public:
key.write(buf);
}
- void read(BufferReadable& buf) {
- value.read(buf);
- key.read(buf);
+ void read(BufferReadable& buf, Arena* arena) {
+ value.read(buf, arena);
+ key.read(buf, arena);
}
};
@@ -129,8 +129,8 @@ public:
}
void deserialize(AggregateDataPtr __restrict place, BufferReadable& buf,
- Arena*) const override {
- this->data(place).read(buf);
+ Arena* arena) const override {
+ this->data(place).read(buf, arena);
}
void insert_result_into(ConstAggregateDataPtr __restrict place, IColumn&
to) const override {
diff --git
a/be/src/vec/aggregate_functions/aggregate_function_percentile_approx.h
b/be/src/vec/aggregate_functions/aggregate_function_percentile_approx.h
index b94b922840..13884e92ba 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_percentile_approx.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_percentile_approx.h
@@ -69,7 +69,7 @@ struct PercentileApproxState {
if (compression < 2048 || compression > 10000) {
compression = 10000;
}
- digest.reset(new TDigest(compression));
+ digest = TDigest::create_unique(compression);
compressions = compression;
init_flag = true;
}
@@ -101,7 +101,7 @@ struct PercentileApproxState {
read_binary(compressions, buf);
std::string str;
read_binary(str, buf);
- digest.reset(new TDigest(compressions));
+ digest = TDigest::create_unique(compressions);
digest->unserialize((uint8_t*)str.c_str());
}
@@ -121,7 +121,7 @@ struct PercentileApproxState {
DCHECK(digest.get() != nullptr);
digest->merge(rhs.digest.get());
} else {
- digest.reset(new TDigest(compressions));
+ digest = TDigest::create_unique(compressions);
digest->merge(rhs.digest.get());
init_flag = true;
}
@@ -138,7 +138,7 @@ struct PercentileApproxState {
void reset() {
target_quantile = INIT_QUANTILE;
init_flag = false;
- digest.reset(new TDigest(compressions));
+ digest = TDigest::create_unique(compressions);
}
bool init_flag = false;
diff --git
a/be/src/vec/aggregate_functions/aggregate_function_reader_first_last.h
b/be/src/vec/aggregate_functions/aggregate_function_reader_first_last.h
index 9ec64b248b..54d146ca37 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_reader_first_last.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_reader_first_last.h
@@ -232,20 +232,20 @@ private:
template <template <typename> class AggregateFunctionTemplate, template
<typename> class Impl,
bool result_is_nullable, bool arg_is_nullable, bool is_copy = false>
-IAggregateFunction* create_function_single_value(const String& name,
- const DataTypes&
argument_types) {
+AggregateFunctionPtr create_function_single_value(const String& name,
+ const DataTypes&
argument_types) {
auto type = remove_nullable(argument_types[0]);
WhichDataType which(*type);
-#define DISPATCH(TYPE, COLUMN_TYPE) \
- if (which.idx == TypeIndex::TYPE) \
- return new AggregateFunctionTemplate<Impl<ReaderFirstAndLastData< \
- COLUMN_TYPE, result_is_nullable, arg_is_nullable,
is_copy>>>(argument_types);
+#define DISPATCH(TYPE, COLUMN_TYPE)
\
+ if (which.idx == TypeIndex::TYPE)
\
+ return
std::make_shared<AggregateFunctionTemplate<Impl<ReaderFirstAndLastData< \
+ COLUMN_TYPE, result_is_nullable, arg_is_nullable,
is_copy>>>>(argument_types);
TYPE_TO_COLUMN_TYPE(DISPATCH)
#undef DISPATCH
- LOG(FATAL) << "with unknowed type, failed in create_aggregate_function_"
<< name
- << " and type is: " << argument_types[0]->get_name();
+ LOG(WARNING) << "with unknowed type, failed in
create_aggregate_function_" << name
+ << " and type is: " << argument_types[0]->get_name();
return nullptr;
}
diff --git a/be/src/vec/aggregate_functions/aggregate_function_window.cpp
b/be/src/vec/aggregate_functions/aggregate_function_window.cpp
index f18668690d..bacdd45131 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_window.cpp
+++ b/be/src/vec/aggregate_functions/aggregate_function_window.cpp
@@ -34,15 +34,15 @@ namespace doris::vectorized {
template <template <typename> class AggregateFunctionTemplate,
template <typename ColVecType, bool, bool> class Data, template
<typename> class Impl,
bool result_is_nullable, bool arg_is_nullable>
-IAggregateFunction* create_function_lead_lag_first_last(const String& name,
- const DataTypes&
argument_types) {
+AggregateFunctionPtr create_function_lead_lag_first_last(const String& name,
+ const DataTypes&
argument_types) {
auto type = remove_nullable(argument_types[0]);
WhichDataType which(*type);
-#define DISPATCH(TYPE, COLUMN_TYPE) \
- if (which.idx == TypeIndex::TYPE) \
- return new AggregateFunctionTemplate< \
- Impl<Data<COLUMN_TYPE, result_is_nullable,
arg_is_nullable>>>(argument_types);
+#define DISPATCH(TYPE, COLUMN_TYPE) \
+ if (which.idx == TypeIndex::TYPE) \
+ return std::make_shared<AggregateFunctionTemplate< \
+ Impl<Data<COLUMN_TYPE, result_is_nullable,
arg_is_nullable>>>>(argument_types);
TYPE_TO_BASIC_COLUMN_TYPE(DISPATCH)
#undef DISPATCH
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]