This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-1.2-lts by this push:
new 818072389f [FIX](array)Fix update hash func array (#21630)
818072389f is described below
commit 818072389f7c9b4344699bfc88a8babe663193c4
Author: amory <[email protected]>
AuthorDate: Sun Jul 9 22:03:58 2023 +0800
[FIX](array)Fix update hash func array (#21630)
---
be/src/vec/columns/column.h | 16 ++++++---
be/src/vec/columns/column_array.cpp | 62 +++++++++++++++++++++++++++++++--
be/src/vec/columns/column_array.h | 14 +++++++-
be/src/vec/columns/column_const.h | 12 +++++++
be/src/vec/columns/column_decimal.cpp | 32 ++++++++++-------
be/src/vec/columns/column_decimal.h | 2 ++
be/src/vec/columns/column_nullable.cpp | 18 ++++++++++
be/src/vec/columns/column_nullable.h | 2 ++
be/src/vec/columns/column_string.h | 12 +++++++
be/src/vec/columns/column_vector.h | 19 ++++++++++
be/src/vec/sink/vdata_stream_sender.cpp | 3 ++
11 files changed, 172 insertions(+), 20 deletions(-)
diff --git a/be/src/vec/columns/column.h b/be/src/vec/columns/column.h
index 0a2c66183e..89dd4fbef4 100644
--- a/be/src/vec/columns/column.h
+++ b/be/src/vec/columns/column.h
@@ -339,7 +339,7 @@ public:
/// On subsequent calls of this method for sequence of column values of
arbitrary types,
/// passed bytes to hash must identify sequence of values unambiguously.
virtual void update_hash_with_value(size_t n, SipHash& hash) const {
- LOG(FATAL) << "update_hash_with_value siphash not supported";
+ LOG(FATAL) << get_name() << "update_hash_with_value siphash not
supported";
}
/// Update state of hash function with value of n elements to avoid the
virtual function call
@@ -348,7 +348,7 @@ public:
/// do xxHash here, faster than other hash method
virtual void update_hashes_with_value(std::vector<SipHash>& hashes,
const uint8_t* __restrict null_data
= nullptr) const {
- LOG(FATAL) << "update_hashes_with_value siphash not supported";
+ LOG(FATAL) << get_name() << "update_hashes_with_value siphash not
supported";
};
/// Update state of hash function with value of n elements to avoid the
virtual function call
@@ -357,17 +357,25 @@ public:
/// do xxHash here, faster than other sip hash
virtual void update_hashes_with_value(uint64_t* __restrict hashes,
const uint8_t* __restrict null_data
= nullptr) const {
- LOG(FATAL) << "update_hashes_with_value xxhash not supported";
+ LOG(FATAL) << get_name() << "update_hashes_with_value xxhash not
supported";
};
+ virtual void update_xxHash_with_value(size_t n, uint64_t& hash) const {
+ LOG(FATAL) << get_name() << " update_hash_with_value xxhash not
supported";
+ }
+
/// Update state of crc32 hash function with value of n elements to avoid
the virtual function call
/// null_data to mark whether need to do hash compute, null_data == nullptr
/// means all element need to do hash function, else only *null_data != 0
need to do hash func
virtual void update_crcs_with_value(std::vector<uint64_t>& hash,
PrimitiveType type,
const uint8_t* __restrict null_data =
nullptr) const {
- LOG(FATAL) << "update_crcs_with_value not supported";
+ LOG(FATAL) << get_name() << "update_crcs_with_value not supported";
};
+ virtual void update_crc_with_value(size_t n, uint64_t& hash) const {
+ LOG(FATAL) << get_name() << " update_crc_with_value not supported";
+ }
+
/** Removes elements that don't match the filter.
* Is used in WHERE and HAVING operations.
* If result_size_hint > 0, then makes advance reserve(result_size_hint)
for the result column;
diff --git a/be/src/vec/columns/column_array.cpp
b/be/src/vec/columns/column_array.cpp
index 4ae7ada9f7..999991fde5 100644
--- a/be/src/vec/columns/column_array.cpp
+++ b/be/src/vec/columns/column_array.cpp
@@ -223,11 +223,67 @@ const char*
ColumnArray::deserialize_and_insert_from_arena(const char* pos) {
return pos;
}
-void ColumnArray::update_hash_with_value(size_t n, SipHash& hash) const {
- size_t array_size = size_at(n);
+void ColumnArray::update_hashes_with_value(std::vector<SipHash>& hashes,
+ const uint8_t* __restrict
null_data) const {
+ SIP_HASHES_FUNCTION_COLUMN_IMPL();
+}
+
+// for every array row calculate xxHash
+void ColumnArray::update_xxHash_with_value(size_t n, uint64_t& hash) const {
+ size_t elem_size = size_at(n);
+ size_t offset = offset_at(n);
+ hash = HashUtil::xxHash64WithSeed(reinterpret_cast<const
char*>(&elem_size), sizeof(elem_size),
+ hash);
+ for (auto i = 0; i < elem_size; ++i) {
+ get_data().update_xxHash_with_value(offset + i, hash);
+ }
+}
+
+// for every array row calculate crcHash
+void ColumnArray::update_crc_with_value(size_t n, uint64_t& crc) const {
+ size_t elem_size = size_at(n);
size_t offset = offset_at(n);
- for (size_t i = 0; i < array_size; ++i)
get_data().update_hash_with_value(offset + i, hash);
+ crc = HashUtil::zlib_crc_hash(reinterpret_cast<const char*>(&elem_size),
sizeof(elem_size),
+ crc);
+ for (auto i = 0; i < elem_size; ++i) {
+ get_data().update_crc_with_value(offset + i, crc);
+ }
+}
+
+void ColumnArray::update_hashes_with_value(uint64_t* __restrict hashes,
+ const uint8_t* __restrict
null_data) const {
+ auto s = size();
+ if (null_data) {
+ for (size_t i = 0; i < s; ++i) {
+ if (null_data[i] == 0) {
+ update_xxHash_with_value(i, hashes[i]);
+ }
+ }
+ } else {
+ for (size_t i = 0; i < s; ++i) {
+ update_xxHash_with_value(i, hashes[i]);
+ }
+ }
+}
+
+void ColumnArray::update_crcs_with_value(std::vector<uint64_t>& hash,
PrimitiveType type,
+ const uint8_t* __restrict null_data)
const {
+ auto s = hash.size();
+ DCHECK(s == size());
+
+ if (null_data) {
+ for (size_t i = 0; i < s; ++i) {
+ // every row
+ if (null_data[i] == 0) {
+ update_crc_with_value(i, hash[i]);
+ }
+ }
+ } else {
+ for (size_t i = 0; i < s; ++i) {
+ update_crc_with_value(i, hash[i]);
+ }
+ }
}
void ColumnArray::insert(const Field& x) {
diff --git a/be/src/vec/columns/column_array.h
b/be/src/vec/columns/column_array.h
index 824cca8b23..7239de3347 100644
--- a/be/src/vec/columns/column_array.h
+++ b/be/src/vec/columns/column_array.h
@@ -97,7 +97,17 @@ public:
void insert_data(const char* pos, size_t length) override;
StringRef serialize_value_into_arena(size_t n, Arena& arena, char const*&
begin) const override;
const char* deserialize_and_insert_from_arena(const char* pos) override;
- void update_hash_with_value(size_t n, SipHash& hash) const override;
+ void update_xxHash_with_value(size_t n, uint64_t& hash) const override;
+ void update_crc_with_value(size_t n, uint64_t& crc) const override;
+
+ void update_hashes_with_value(std::vector<SipHash>& hashes,
+ const uint8_t* __restrict null_data) const
override;
+
+ void update_hashes_with_value(uint64_t* __restrict hashes,
+ const uint8_t* __restrict null_data =
nullptr) const override;
+
+ void update_crcs_with_value(std::vector<uint64_t>& hash, PrimitiveType
type,
+ const uint8_t* __restrict null_data = nullptr)
const override;
void insert_range_from(const IColumn& src, size_t start, size_t length)
override;
void insert(const Field& x) override;
void insert_from(const IColumn& src_, size_t n) override;
@@ -179,6 +189,8 @@ public:
Status filter_by_selector(const uint16_t* sel, size_t sel_size, IColumn*
col_ptr) override;
private:
+ // [[2,1,5,9,1], [1,2,4]] --> data column [2,1,5,9,1,1,2,4], offset[-1] =
0, offset[0] = 5, offset[1] = 8
+ // [[[2,1,5],[9,1]], [[1,2]]] --> data column [3 column array], offset[-1]
= 0, offset[0] = 2, offset[1] = 3
WrappedPtr data;
WrappedPtr offsets;
diff --git a/be/src/vec/columns/column_const.h
b/be/src/vec/columns/column_const.h
index 9637a0943f..13640a0a9a 100644
--- a/be/src/vec/columns/column_const.h
+++ b/be/src/vec/columns/column_const.h
@@ -135,6 +135,18 @@ public:
void update_hashes_with_value(uint64_t* __restrict hashes,
const uint8_t* __restrict null_data) const
override;
+ void update_xxHash_with_value(size_t n, uint64_t& hash) const override {
+ auto real_data = data->get_data_at(0);
+ if (real_data.data == nullptr) {
+ hash = HashUtil::xxHash64NullWithSeed(hash);
+ } else {
+ hash = HashUtil::xxHash64WithSeed(real_data.data, real_data.size,
hash);
+ }
+ }
+
+ void update_crc_with_value(size_t n, uint64_t& crc) const override {
+ get_data_column_ptr()->update_crc_with_value(n, crc);
+ }
ColumnPtr filter(const Filter& filt, ssize_t result_size_hint) const
override;
ColumnPtr replicate(const Offsets& offsets) const override;
diff --git a/be/src/vec/columns/column_decimal.cpp
b/be/src/vec/columns/column_decimal.cpp
index 13902b54a7..78e7d055cb 100644
--- a/be/src/vec/columns/column_decimal.cpp
+++ b/be/src/vec/columns/column_decimal.cpp
@@ -127,36 +127,44 @@ void
ColumnDecimal<T>::update_hashes_with_value(std::vector<SipHash>& hashes,
SIP_HASHES_FUNCTION_COLUMN_IMPL();
}
+template <typename T>
+void ColumnDecimal<T>::update_crc_with_value(size_t n, uint64_t& crc) const {
+ if constexpr (!IsDecimalV2<T>) {
+ crc = HashUtil::zlib_crc_hash(&data[n], sizeof(T), crc);
+ } else {
+ const DecimalV2Value& dec_val = (const DecimalV2Value&)data[n];
+ int64_t int_val = dec_val.int_value();
+ int32_t frac_val = dec_val.frac_value();
+ crc = HashUtil::zlib_crc_hash(&int_val, sizeof(int_val), crc);
+ crc = HashUtil::zlib_crc_hash(&frac_val, sizeof(frac_val), crc);
+ };
+}
+
template <typename T>
void ColumnDecimal<T>::update_crcs_with_value(std::vector<uint64_t>& hashes,
PrimitiveType type,
const uint8_t* __restrict
null_data) const {
auto s = hashes.size();
DCHECK(s == size());
-
if constexpr (!IsDecimalV2<T>) {
DO_CRC_HASHES_FUNCTION_COLUMN_IMPL()
} else {
- DCHECK(type == TYPE_DECIMALV2);
- auto decimalv2_do_crc = [&](size_t i) {
- const DecimalV2Value& dec_val = (const DecimalV2Value&)data[i];
- int64_t int_val = dec_val.int_value();
- int32_t frac_val = dec_val.frac_value();
- hashes[i] = HashUtil::zlib_crc_hash(&int_val, sizeof(int_val),
hashes[i]);
- hashes[i] = HashUtil::zlib_crc_hash(&frac_val, sizeof(frac_val),
hashes[i]);
- };
-
if (null_data == nullptr) {
for (size_t i = 0; i < s; i++) {
- decimalv2_do_crc(i);
+ update_crc_with_value(i, hashes[i]);
}
} else {
for (size_t i = 0; i < s; i++) {
- if (null_data[i] == 0) decimalv2_do_crc(i);
+ if (null_data[i] == 0) update_crc_with_value(i, hashes[i]);
}
}
}
}
+template <typename T>
+void ColumnDecimal<T>::update_xxHash_with_value(size_t n, uint64_t& hash)
const {
+ hash = HashUtil::xxHash64WithSeed(reinterpret_cast<const char*>(&data[n]),
sizeof(T), hash);
+}
+
template <typename T>
void ColumnDecimal<T>::update_hashes_with_value(uint64_t* __restrict hashes,
const uint8_t* __restrict
null_data) const {
diff --git a/be/src/vec/columns/column_decimal.h
b/be/src/vec/columns/column_decimal.h
index a3960067cb..8968ff6017 100644
--- a/be/src/vec/columns/column_decimal.h
+++ b/be/src/vec/columns/column_decimal.h
@@ -159,6 +159,8 @@ public:
const uint8_t* __restrict null_data) const
override;
void update_crcs_with_value(std::vector<uint64_t>& hashes, PrimitiveType
type,
const uint8_t* __restrict null_data) const
override;
+ void update_xxHash_with_value(size_t n, uint64_t& hash) const override;
+ void update_crc_with_value(size_t n, uint64_t& crc) const override;
int compare_at(size_t n, size_t m, const IColumn& rhs_, int
nan_direction_hint) const override;
void get_permutation(bool reverse, size_t limit, int nan_direction_hint,
diff --git a/be/src/vec/columns/column_nullable.cpp
b/be/src/vec/columns/column_nullable.cpp
index 5b087550cc..0e90251343 100644
--- a/be/src/vec/columns/column_nullable.cpp
+++ b/be/src/vec/columns/column_nullable.cpp
@@ -114,6 +114,24 @@ void ColumnNullable::update_hashes_with_value(uint64_t*
__restrict hashes,
}
}
+void ColumnNullable::update_xxHash_with_value(size_t n, uint64_t& hash) const {
+ auto* __restrict real_null_data = assert_cast<const
ColumnUInt8&>(*null_map).get_data().data();
+ if (real_null_data[n] != 0) {
+ hash = HashUtil::xxHash64NullWithSeed(hash);
+ } else {
+ nested_column->update_xxHash_with_value(n, hash);
+ }
+}
+
+void ColumnNullable::update_crc_with_value(size_t n, uint64_t& crc) const {
+ auto* __restrict real_null_data = assert_cast<const
ColumnUInt8&>(*null_map).get_data().data();
+ if (real_null_data[n] != 0) {
+ crc = HashUtil::zlib_crc_hash_null(crc);
+ } else {
+ nested_column->update_xxHash_with_value(n, crc);
+ }
+}
+
MutableColumnPtr ColumnNullable::clone_resized(size_t new_size) const {
MutableColumnPtr new_nested_col =
get_nested_column().clone_resized(new_size);
auto new_null_map = ColumnUInt8::create();
diff --git a/be/src/vec/columns/column_nullable.h
b/be/src/vec/columns/column_nullable.h
index fe34724954..9f29965a32 100644
--- a/be/src/vec/columns/column_nullable.h
+++ b/be/src/vec/columns/column_nullable.h
@@ -186,6 +186,8 @@ public:
const uint8_t* __restrict null_data) const
override;
void update_hashes_with_value(uint64_t* __restrict hashes,
const uint8_t* __restrict null_data) const
override;
+ void update_xxHash_with_value(size_t n, uint64_t& hash) const override;
+ void update_crc_with_value(size_t n, uint64_t& crc) const override;
void get_extremes(Field& min, Field& max) const override;
MutableColumns scatter(ColumnIndex num_columns, const Selector& selector)
const override {
diff --git a/be/src/vec/columns/column_string.h
b/be/src/vec/columns/column_string.h
index 373e676bf1..b4caa7b985 100644
--- a/be/src/vec/columns/column_string.h
+++ b/be/src/vec/columns/column_string.h
@@ -342,6 +342,18 @@ public:
SIP_HASHES_FUNCTION_COLUMN_IMPL();
}
+ void update_xxHash_with_value(size_t n, uint64_t& hash) const override {
+ size_t string_size = size_at(n);
+ size_t offset = offset_at(n);
+ hash = HashUtil::xxHash64WithSeed(reinterpret_cast<const
char*>(&chars[offset]),
+ string_size, hash);
+ }
+
+ void update_crc_with_value(size_t n, uint64_t& crc) const override {
+ auto data_ref = get_data_at(n);
+ crc = HashUtil::zlib_crc_hash(data_ref.data, data_ref.size, crc);
+ }
+
void update_crcs_with_value(std::vector<uint64_t>& hashes, PrimitiveType
type,
const uint8_t* __restrict null_data) const
override;
diff --git a/be/src/vec/columns/column_vector.h
b/be/src/vec/columns/column_vector.h
index c944827fbc..1c18e810b0 100644
--- a/be/src/vec/columns/column_vector.h
+++ b/be/src/vec/columns/column_vector.h
@@ -243,6 +243,25 @@ public:
const uint8_t* null_map,
size_t max_row_byte_size) const override;
+ void update_xxHash_with_value(size_t n, uint64_t& hash) const override {
+ hash = HashUtil::xxHash64WithSeed(reinterpret_cast<const
char*>(&data[n]), sizeof(T), hash);
+ }
+
+ void update_crc_with_value(size_t n, uint64_t& crc) const override {
+ if constexpr (!std::is_same_v<T, Int64>) {
+ crc = HashUtil::zlib_crc_hash(&data[n], sizeof(T), crc);
+ } else {
+ if (this->is_date_type() || this->is_datetime_type()) {
+ char buf[64];
+ const VecDateTimeValue& date_val = (const
VecDateTimeValue&)data[n];
+ auto len = date_val.to_buffer(buf);
+ crc = HashUtil::zlib_crc_hash(buf, len, crc);
+ } else {
+ crc = HashUtil::zlib_crc_hash(&data[n], sizeof(T), crc);
+ }
+ }
+ }
+
void update_hash_with_value(size_t n, SipHash& hash) const override;
void update_hashes_with_value(std::vector<SipHash>& hashes,
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp
b/be/src/vec/sink/vdata_stream_sender.cpp
index 9768f61244..d596d78e20 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -569,6 +569,7 @@ Status VDataStreamSender::send(RuntimeState* state, Block*
block) {
std::vector<SipHash> siphashs(rows);
// result[j] means column index, i means rows index
for (int j = 0; j < result_size; ++j) {
+ // complex type most not implement get_data_at() method
which column_const will call
block->get_by_position(result[j]).column->update_hashes_with_value(siphashs);
}
for (int i = 0; i < rows; i++) {
@@ -578,6 +579,7 @@ Status VDataStreamSender::send(RuntimeState* state, Block*
block) {
SCOPED_TIMER(_split_block_hash_compute_timer);
// result[j] means column index, i means rows index, here to
calculate the xxhash value
for (int j = 0; j < result_size; ++j) {
+ // complex type most not implement get_data_at() method
which column_const will call
block->get_by_position(result[j]).column->update_hashes_with_value(hashes);
}
@@ -590,6 +592,7 @@ Status VDataStreamSender::send(RuntimeState* state, Block*
block) {
RETURN_IF_ERROR(channel_add_rows(_channels, element_size, hashes,
rows, block));
} else {
for (int j = 0; j < result_size; ++j) {
+ // complex type most not implement get_data_at() method which
column_const will call
block->get_by_position(result[j]).column->update_crcs_with_value(
hash_vals,
_partition_expr_ctxs[j]->root()->type().type);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]