This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 3d23f8b2b5 [Opt](functions) Use preloaded cache to accelerate timezone
parsing (#22694) (#23510)
3d23f8b2b5 is described below
commit 3d23f8b2b51503ba6f0e03977932751054709fcc
Author: zclllyybb <[email protected]>
AuthorDate: Fri Aug 25 21:27:17 2023 +0800
[Opt](functions) Use preloaded cache to accelerate timezone parsing
(#22694) (#23510)
---
be/src/http/action/stream_load.cpp | 3 -
be/src/http/http_common.h | 1 -
be/src/runtime/exec_env.h | 8 +
be/src/runtime/exec_env_init.cpp | 4 +
be/src/util/timezone_utils.cpp | 177 ++++++++++++++++++++-
be/src/util/timezone_utils.h | 6 +-
be/src/vec/functions/function_cast.h | 42 +++--
be/src/vec/functions/function_convert_tz.h | 55 ++++---
be/src/vec/io/io_helper.h | 25 +--
be/src/vec/runtime/vdatetime_value.cpp | 48 +++---
be/src/vec/runtime/vdatetime_value.h | 12 +-
be/test/vec/function/function_time_test.cpp | 32 ----
.../datetimev2/test_tz_streamload.groovy | 2 +-
13 files changed, 294 insertions(+), 121 deletions(-)
diff --git a/be/src/http/action/stream_load.cpp
b/be/src/http/action/stream_load.cpp
index b31f4fe331..837a84f5a5 100644
--- a/be/src/http/action/stream_load.cpp
+++ b/be/src/http/action/stream_load.cpp
@@ -472,9 +472,6 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req,
if (!http_req->header(HTTP_TIMEZONE).empty()) {
request.__set_timezone(http_req->header(HTTP_TIMEZONE));
}
- if (!http_req->header(HTTP_TIME_ZONE).empty()) {
- request.__set_timezone(http_req->header(HTTP_TIME_ZONE));
- }
if (!http_req->header(HTTP_EXEC_MEM_LIMIT).empty()) {
try {
request.__set_execMemLimit(std::stoll(http_req->header(HTTP_EXEC_MEM_LIMIT)));
diff --git a/be/src/http/http_common.h b/be/src/http/http_common.h
index afcbc3ff0d..616ae872f0 100644
--- a/be/src/http/http_common.h
+++ b/be/src/http/http_common.h
@@ -38,7 +38,6 @@ static const std::string HTTP_TEMP_PARTITIONS =
"temporary_partitions";
static const std::string HTTP_NEGATIVE = "negative";
static const std::string HTTP_STRICT_MODE = "strict_mode";
static const std::string HTTP_TIMEZONE = "timezone";
-static const std::string HTTP_TIME_ZONE = "time_zone";
static const std::string HTTP_EXEC_MEM_LIMIT = "exec_mem_limit";
static const std::string HTTP_JSONPATHS = "jsonpaths";
static const std::string HTTP_JSONROOT = "json_root";
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index 40c489efb5..93db9b4f48 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -22,6 +22,7 @@
#include <algorithm>
#include <map>
#include <memory>
+#include <shared_mutex>
#include <string>
#include <unordered_map>
#include <vector>
@@ -29,11 +30,13 @@
#include "common/status.h"
#include "olap/options.h"
#include "util/threadpool.h"
+#include "vec/common/hash_table/phmap_fwd_decl.h"
namespace doris {
namespace vectorized {
class VDataStreamMgr;
class ScannerScheduler;
+using ZoneList = std::unordered_map<std::string, cctz::time_zone>;
} // namespace vectorized
namespace pipeline {
class TaskScheduler;
@@ -172,6 +175,8 @@ public:
HeartbeatFlags* heartbeat_flags() { return _heartbeat_flags; }
doris::vectorized::ScannerScheduler* scanner_scheduler() { return
_scanner_scheduler; }
FileMetaCache* file_meta_cache() { return _file_meta_cache; }
+ vectorized::ZoneList& global_zone_cache() { return *_global_zone_cache; }
+ std::shared_mutex& zone_cache_rw_lock() { return _zone_cache_rw_lock; }
// only for unit test
void set_master_info(TMasterInfo* master_info) { this->_master_info =
master_info; }
@@ -255,6 +260,9 @@ private:
BlockSpillManager* _block_spill_mgr = nullptr;
// To save meta info of external file, such as parquet footer.
FileMetaCache* _file_meta_cache = nullptr;
+
+ std::unique_ptr<vectorized::ZoneList> _global_zone_cache;
+ std::shared_mutex _zone_cache_rw_lock;
};
template <>
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 6b7e337ecd..5fca5e325e 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -77,6 +77,7 @@
#include "util/parse_util.h"
#include "util/pretty_printer.h"
#include "util/threadpool.h"
+#include "util/timezone_utils.h"
#include "vec/exec/scan/scanner_scheduler.h"
#include "vec/runtime/vdata_stream_mgr.h"
@@ -116,6 +117,9 @@ Status ExecEnv::_init(const std::vector<StorePath>&
store_paths) {
TimezoneUtils::load_timezone_names();
+ _global_zone_cache = std::make_unique<vectorized::ZoneList>();
+ TimezoneUtils::load_timezones_to_cache(*_global_zone_cache);
+
ThreadPoolBuilder("SendBatchThreadPool")
.set_min_threads(config::send_batch_thread_pool_thread_num)
.set_max_threads(config::send_batch_thread_pool_thread_num)
diff --git a/be/src/util/timezone_utils.cpp b/be/src/util/timezone_utils.cpp
index 364bf53bbc..112dc74eb0 100644
--- a/be/src/util/timezone_utils.cpp
+++ b/be/src/util/timezone_utils.cpp
@@ -14,13 +14,18 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-//
#include "util/timezone_utils.h"
#include <cctz/civil_time.h>
#include <cctz/time_zone.h>
+#include <fcntl.h>
+#include <glog/logging.h>
#include <re2/stringpiece.h>
+#include <sys/mman.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <unistd.h>
#include <boost/algorithm/string.hpp>
#include <cctype>
@@ -68,6 +73,175 @@ void TimezoneUtils::load_timezone_names() {
}
}
}
+
+namespace { // functions use only in this file
+
+template <typename T>
+T swapEndianness(T value) {
+ constexpr int numBytes = sizeof(T);
+ T result = 0;
+ for (int i = 0; i < numBytes; ++i) {
+ result = (result << 8) | ((value >> (8 * i)) & 0xFF);
+ }
+ return result;
+}
+
+template <typename T>
+T next_from_charstream(int8_t*& src) {
+ T value = *reinterpret_cast<T*>(src);
+ src += sizeof(T) / sizeof(int8_t);
+ if constexpr (std::endian::native == std::endian::little) {
+ return swapEndianness(
+ value); // timezone information files use network endianess,
which is big-endian
+ } else if (std::endian::native == std::endian::big) {
+ return value;
+ } else {
+ LOG(FATAL) << "Unknown endianess";
+ }
+ __builtin_unreachable();
+}
+
+std::pair<int8_t*, int> load_file_to_memory(const std::string& path) {
+ int fd = open(path.c_str(), O_RDONLY);
+ int len = lseek(fd, 0, SEEK_END); // bytes
+
+ int8_t* addr = (int8_t*)mmap(nullptr, len, PROT_READ, MAP_PRIVATE, fd, 0);
+ int8_t* data = new int8_t[len];
+ memcpy(data, addr, len);
+ close(fd);
+ munmap(addr, len);
+
+ return {data, len};
+}
+
+struct alignas(alignof(uint8_t)) ttinfo {
+ uint8_t tt_utoff[4]; // need force cast to int32_t
+ uint8_t tt_isdst;
+ uint8_t tt_desigidx;
+};
+constexpr static int TTINFO_SIZE = sizeof(ttinfo);
+static_assert(TTINFO_SIZE == 6);
+
+struct real_ttinfo {
+ [[maybe_unused]] real_ttinfo() = default; // actually it's used. how
stupid compiler!
+ real_ttinfo(const ttinfo& arg) {
+ diff_seconds = *reinterpret_cast<const int32_t*>(arg.tt_utoff + 0);
+ is_dst = arg.tt_isdst;
+ name_index = arg.tt_desigidx;
+ }
+
+ int32_t diff_seconds; // to UTC
+ bool is_dst;
+ uint8_t name_index;
+};
+
+template <>
+ttinfo next_from_charstream<ttinfo>(int8_t*& src) {
+ ttinfo value = *reinterpret_cast<ttinfo*>(src);
+ src += TTINFO_SIZE;
+ if constexpr (std::endian::native == std::endian::little) {
+ std::swap(value.tt_utoff[0], value.tt_utoff[3]);
+ std::swap(value.tt_utoff[1], value.tt_utoff[2]);
+ }
+ return value;
+}
+
+/*
+ * follow the rule of tzfile(5) which defined in
https://man7.org/linux/man-pages/man5/tzfile.5.html.
+ * should change when it changes.
+ */
+bool parse_load_timezone(vectorized::ZoneList& zone_list, int8_t* data, int
len,
+ bool first_time = true) {
+ int8_t* begin_pos = data;
+ /* HEADERS */
+ if (memcmp(data, "TZif", 4) != 0) [[unlikely]] { // magic number
+ return false;
+ }
+ data += 4;
+
+ // if version = 2, the whole header&data will repeat itself one time.
+ int8_t version = next_from_charstream<int8_t>(data) - '0';
+ data += 15; // null bits
+ int32_t ut_count = next_from_charstream<int32_t>(data);
+ int32_t wall_count = next_from_charstream<int32_t>(data);
+ int32_t leap_count = next_from_charstream<int32_t>(data);
+ int32_t trans_time_count = next_from_charstream<int32_t>(data);
+ int32_t type_count = next_from_charstream<int32_t>(data);
+ int32_t char_count = next_from_charstream<int32_t>(data);
+
+ /* HEADERS end, FIELDS begin*/
+ // transaction time points, which we don't need
+ data += (first_time ? 5 : 9) * trans_time_count;
+
+ // timezones
+ std::vector<real_ttinfo> timezones(type_count);
+ for (int i = 0; i < type_count; i++) {
+ ttinfo tz_data = next_from_charstream<ttinfo>(data);
+ timezones[i] = tz_data; // cast by c'tor
+ }
+
+ // timezone names
+ const char* name_zone = (char*)data;
+ data += char_count;
+
+ // concate names
+ for (auto& tz : timezones) {
+ int len = strlen(name_zone + tz.name_index);
+ zone_list.emplace(std::string {name_zone + tz.name_index, name_zone +
tz.name_index + len},
+
cctz::fixed_time_zone(cctz::seconds(tz.diff_seconds)));
+ }
+
+ // the second part.
+ if (version == 2 && first_time) {
+ // leap seconds, standard/wall indicators, UT/local indicators, which
we don't need
+ data += 4 * leap_count + wall_count + ut_count;
+
+ return (data < begin_pos + len) &&
+ parse_load_timezone(zone_list, data, len - (data - begin_pos),
false);
+ }
+
+ return true;
+}
+
+} // namespace
+
+void TimezoneUtils::load_timezones_to_cache(vectorized::ZoneList& cache_list) {
+ cache_list["CST"] = cctz::fixed_time_zone(cctz::seconds(8 * 3600));
+
+ std::string base_str;
+ const char* tzdir = "/usr/share/zoneinfo"; // default
+ // try get from System
+ char* tzdir_env = std::getenv("TZDIR");
+ if (tzdir_env && *tzdir_env) {
+ tzdir = tzdir_env;
+ }
+
+ base_str += tzdir;
+ base_str += '/';
+
+ const auto root_path = std::filesystem::path {base_str};
+ std::set<std::string> ignore_paths = {"posix", "right"}; // duplications
+
+ for (std::filesystem::recursive_directory_iterator it {base_str}; it !=
end(it); it++) {
+ const auto& dir_entry = *it;
+ if (dir_entry.is_regular_file()) {
+ auto tz_name = relative(dir_entry, base_str);
+
+ auto tz_path = dir_entry.path().string();
+ auto [handle, length] = load_file_to_memory(tz_path);
+
+ parse_load_timezone(cache_list, handle, length);
+
+ delete[] handle;
+ } else if (dir_entry.is_directory() &&
ignore_paths.contains(dir_entry.path().filename())) {
+ it.disable_recursion_pending();
+ }
+ }
+
+ cache_list.erase("LMT"); // local mean time for every timezone
+ LOG(INFO) << "Read " << cache_list.size() << " timezones.";
+}
+
bool TimezoneUtils::find_cctz_time_zone(const std::string& timezone,
cctz::time_zone& ctz) {
auto timezone_lower = boost::algorithm::to_lower_copy(timezone);
re2::StringPiece value;
@@ -121,6 +295,7 @@ bool TimezoneUtils::find_cctz_time_zone(const std::string&
timezone, cctz::time_
} else {
auto it = timezone_names_map_.find(timezone_lower);
if (it == timezone_names_map_.end()) {
+ VLOG_DEBUG << "Illegal timezone " << timezone_lower;
return false;
}
tz_parsed = cctz::load_time_zone(it->second, &ctz);
diff --git a/be/src/util/timezone_utils.h b/be/src/util/timezone_utils.h
index d9e5ee82d8..0f3a6dcc38 100644
--- a/be/src/util/timezone_utils.h
+++ b/be/src/util/timezone_utils.h
@@ -29,12 +29,16 @@ class time_zone;
namespace doris {
+namespace vectorized {
+using ZoneList = std::unordered_map<std::string, cctz::time_zone>;
+}
+
class TimezoneUtils {
public:
static void load_timezone_names();
+ static void load_timezones_to_cache(vectorized::ZoneList& cache_list);
static bool find_cctz_time_zone(const std::string& timezone,
cctz::time_zone& ctz);
-public:
static const std::string default_time_zone;
private:
diff --git a/be/src/vec/functions/function_cast.h
b/be/src/vec/functions/function_cast.h
index 1907ad5988..e0e7f8d0fe 100644
--- a/be/src/vec/functions/function_cast.h
+++ b/be/src/vec/functions/function_cast.h
@@ -43,6 +43,7 @@
// IWYU pragma: no_include <opentelemetry/common/threadlocal.h>
#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/status.h"
+#include "runtime/exec_env.h"
#include "runtime/runtime_state.h"
#include "udf/udf.h"
#include "util/jsonb_document.h"
@@ -124,13 +125,13 @@ struct TimeCast {
// '300' -> 00:03:00 '20:23' -> 20:23:00 '20:23:24' -> 20:23:24
template <typename T>
static bool try_parse_time(char* s, size_t len, T& x, const
cctz::time_zone& local_time_zone,
- ZoneList& time_zone_cache) {
+ ZoneList& time_zone_cache, std::shared_mutex&
cache_lock) {
/// TODO: Maybe we can move Timecast to the io_helper.
if (try_as_time(s, len, x, local_time_zone)) {
return true;
} else {
if (VecDateTimeValue dv {};
- dv.from_date_str(s, len, local_time_zone, time_zone_cache)) {
+ dv.from_date_str(s, len, local_time_zone, time_zone_cache,
&cache_lock)) {
// can be parse as a datetime
x = dv.hour() * 3600 + dv.minute() * 60 + dv.second();
return true;
@@ -852,6 +853,7 @@ struct NameToDateTime {
template <typename DataType, typename Additions = void*, typename FromDataType
= void*>
bool try_parse_impl(typename DataType::FieldType& x, ReadBuffer& rb,
const cctz::time_zone& local_time_zone, ZoneList&
time_zone_cache,
+ std::shared_mutex& cache_lock,
Additions additions [[maybe_unused]] = Additions()) {
if constexpr (IsDateTimeType<DataType>) {
return try_read_datetime_text(x, rb, local_time_zone, time_zone_cache);
@@ -862,12 +864,13 @@ bool try_parse_impl(typename DataType::FieldType& x,
ReadBuffer& rb,
}
if constexpr (IsDateV2Type<DataType>) {
- return try_read_date_v2_text(x, rb, local_time_zone, time_zone_cache);
+ return try_read_date_v2_text(x, rb, local_time_zone, time_zone_cache,
cache_lock);
}
if constexpr (IsDateTimeV2Type<DataType>) {
UInt32 scale = additions;
- return try_read_datetime_v2_text(x, rb, local_time_zone,
time_zone_cache, scale);
+ return try_read_datetime_v2_text(x, rb, local_time_zone,
time_zone_cache, cache_lock,
+ scale);
}
if constexpr (std::is_same_v<DataTypeString, FromDataType> &&
@@ -876,7 +879,8 @@ bool try_parse_impl(typename DataType::FieldType& x,
ReadBuffer& rb,
auto len = rb.count();
auto s = rb.position();
rb.position() = rb.end(); // make is_all_read = true
- auto ret = TimeCast::try_parse_time(s, len, x, local_time_zone,
time_zone_cache);
+ auto ret =
+ TimeCast::try_parse_time(s, len, x, local_time_zone,
time_zone_cache, cache_lock);
x *= (1000 * 1000);
return ret;
}
@@ -1326,10 +1330,8 @@ struct ConvertThroughParsing {
ColumnDecimal<ToFieldType>,
ColumnVector<ToFieldType>>;
// For datelike type, only from FunctionConvertFromString. So we can
use its' context。
- auto convert_ctx = reinterpret_cast<ConvertTzCtx*>(
-
context->get_function_state(FunctionContext::FunctionStateScope::THREAD_LOCAL));
- ZoneList time_zone_cache_;
- auto& time_zone_cache = convert_ctx ? convert_ctx->time_zone_cache :
time_zone_cache_;
+ ZoneList& time_zone_cache =
context->state()->exec_env()->global_zone_cache();
+ std::shared_mutex& cache_lock =
context->state()->exec_env()->zone_cache_rw_lock();
const IColumn* col_from =
block.get_by_position(arguments[0]).column.get();
const ColumnString* col_from_string =
check_and_get_column<ColumnString>(col_from);
@@ -1379,18 +1381,19 @@ struct ConvertThroughParsing {
bool parsed;
if constexpr (IsDataTypeDecimal<ToDataType>) {
- parsed = try_parse_impl<ToDataType>(vec_to[i], read_buffer,
-
context->state()->timezone_obj(),
- time_zone_cache,
vec_to.get_scale());
+ parsed = try_parse_impl<ToDataType>(
+ vec_to[i], read_buffer,
context->state()->timezone_obj(), time_zone_cache,
+ cache_lock, vec_to.get_scale());
} else if constexpr (IsDataTypeDateTimeV2<ToDataType>) {
auto type = check_and_get_data_type<DataTypeDateTimeV2>(
block.get_by_position(result).type.get());
parsed = try_parse_impl<ToDataType>(vec_to[i], read_buffer,
context->state()->timezone_obj(),
- time_zone_cache,
type->get_scale());
+ time_zone_cache,
cache_lock, type->get_scale());
} else {
parsed = try_parse_impl<ToDataType, void*, FromDataType>(
- vec_to[i], read_buffer,
context->state()->timezone_obj(), time_zone_cache);
+ vec_to[i], read_buffer,
context->state()->timezone_obj(), time_zone_cache,
+ cache_lock);
}
(*vec_null_map_to)[i] = !parsed || !is_all_read(read_buffer);
current_offset = next_offset;
@@ -1427,14 +1430,6 @@ public:
ColumnNumbers get_arguments_that_are_always_constant() const override {
return {1}; }
- Status open(FunctionContext* context, FunctionContext::FunctionStateScope
scope) override {
- if (scope != FunctionContext::THREAD_LOCAL) {
- return Status::OK();
- }
- context->set_function_state(scope, std::make_unique<ConvertTzCtx>());
- return Status::OK();
- }
-
// This function should not be called for get DateType Ptr
// using the FunctionCast::get_return_type_impl
DataTypePtr get_return_type_impl(const ColumnsWithTypeAndName& arguments)
const override {
@@ -1442,8 +1437,9 @@ public:
if constexpr (IsDataTypeDecimal<ToDataType>) {
LOG(FATAL) << "Someting wrong with toDecimalNNOrZero() or
toDecimalNNOrNull()";
- } else
+ } else {
res = std::make_shared<ToDataType>();
+ }
return res;
}
diff --git a/be/src/vec/functions/function_convert_tz.h
b/be/src/vec/functions/function_convert_tz.h
index 7c6d0be442..8ff3505aca 100644
--- a/be/src/vec/functions/function_convert_tz.h
+++ b/be/src/vec/functions/function_convert_tz.h
@@ -23,11 +23,15 @@
#include <map>
#include <memory>
+#include <mutex>
+#include <shared_mutex>
#include <string>
#include <type_traits>
#include <utility>
#include "common/status.h"
+#include "runtime/exec_env.h"
+#include "runtime/runtime_state.h"
#include "udf/udf.h"
#include "util/binary_cast.hpp"
#include "util/timezone_utils.h"
@@ -65,9 +69,7 @@ class DateV2Value;
namespace doris::vectorized {
-struct ConvertTzCtx {
- ZoneList time_zone_cache;
-};
+using ZoneList = std::unordered_map<std::string, cctz::time_zone>;
template <typename DateValueType, typename ArgType>
struct ConvertTZImpl {
@@ -90,10 +92,8 @@ struct ConvertTZImpl {
const ColumnString* from_tz_column, const
ColumnString* to_tz_column,
ReturnColumnType* result_column, NullMap&
result_null_map,
size_t input_rows_count) {
- auto convert_ctx = reinterpret_cast<ConvertTzCtx*>(
-
context->get_function_state(FunctionContext::FunctionStateScope::THREAD_LOCAL));
- ZoneList time_zone_cache_;
- auto& time_zone_cache = convert_ctx ? convert_ctx->time_zone_cache :
time_zone_cache_;
+ ZoneList& time_zone_cache =
context->state()->exec_env()->global_zone_cache();
+ std::shared_mutex& cache_lock =
context->state()->exec_env()->zone_cache_rw_lock();
for (size_t i = 0; i < input_rows_count; i++) {
if (result_null_map[i]) {
result_column->insert_default();
@@ -101,8 +101,8 @@ struct ConvertTZImpl {
}
auto from_tz = from_tz_column->get_data_at(i).to_string();
auto to_tz = to_tz_column->get_data_at(i).to_string();
- execute_inner_loop(date_column, time_zone_cache, from_tz, to_tz,
result_column,
- result_null_map, i);
+ execute_inner_loop(date_column, time_zone_cache, cache_lock,
from_tz, to_tz,
+ result_column, result_null_map, i);
}
}
@@ -110,10 +110,8 @@ struct ConvertTZImpl {
const ColumnString* from_tz_column,
const ColumnString* to_tz_column,
ReturnColumnType* result_column,
NullMap& result_null_map, size_t
input_rows_count) {
- auto convert_ctx = reinterpret_cast<ConvertTzCtx*>(
-
context->get_function_state(FunctionContext::FunctionStateScope::THREAD_LOCAL));
- ZoneList time_zone_cache_;
- auto& time_zone_cache = convert_ctx ? convert_ctx->time_zone_cache :
time_zone_cache_;
+ ZoneList& time_zone_cache =
context->state()->exec_env()->global_zone_cache();
+ std::shared_mutex& cache_lock =
context->state()->exec_env()->zone_cache_rw_lock();
auto from_tz = from_tz_column->get_data_at(0).to_string();
auto to_tz = to_tz_column->get_data_at(0).to_string();
@@ -122,33 +120,46 @@ struct ConvertTZImpl {
result_column->insert_default();
continue;
}
- execute_inner_loop(date_column, time_zone_cache, from_tz, to_tz,
result_column,
- result_null_map, i);
+ execute_inner_loop(date_column, time_zone_cache, cache_lock,
from_tz, to_tz,
+ result_column, result_null_map, i);
}
}
static void execute_inner_loop(const ColumnType* date_column, ZoneList&
time_zone_cache,
- const std::string& from_tz, const
std::string& to_tz,
- ReturnColumnType* result_column, NullMap&
result_null_map,
- const size_t index_now) {
+ std::shared_mutex& cache_lock, const
std::string& from_tz,
+ const std::string& to_tz, ReturnColumnType*
result_column,
+ NullMap& result_null_map, const size_t
index_now) {
DateValueType ts_value =
binary_cast<NativeType,
DateValueType>(date_column->get_element(index_now));
int64_t timestamp;
+ cache_lock.lock_shared();
if (time_zone_cache.find(from_tz) == time_zone_cache.cend()) {
+ cache_lock.unlock_shared();
+ std::unique_lock<std::shared_mutex> lock_(cache_lock);
+ //TODO: the lock upgrade could be done in find_... function only
when we push value into the hashmap
if (!TimezoneUtils::find_cctz_time_zone(from_tz,
time_zone_cache[from_tz])) {
+ time_zone_cache.erase(to_tz);
result_null_map[index_now] = true;
result_column->insert_default();
return;
}
+ } else {
+ cache_lock.unlock_shared();
}
+ cache_lock.lock_shared();
if (time_zone_cache.find(to_tz) == time_zone_cache.cend()) {
+ cache_lock.unlock_shared();
+ std::unique_lock<std::shared_mutex> lock_(cache_lock);
if (!TimezoneUtils::find_cctz_time_zone(to_tz,
time_zone_cache[to_tz])) {
+ time_zone_cache.erase(to_tz);
result_null_map[index_now] = true;
result_column->insert_default();
return;
}
+ } else {
+ cache_lock.unlock_shared();
}
if (!ts_value.unix_timestamp(×tamp, time_zone_cache[from_tz])) {
@@ -201,14 +212,6 @@ public:
bool use_default_implementation_for_nulls() const override { return false;
}
- Status open(FunctionContext* context, FunctionContext::FunctionStateScope
scope) override {
- if (scope != FunctionContext::THREAD_LOCAL) {
- return Status::OK();
- }
- context->set_function_state(scope, std::make_unique<ConvertTzCtx>());
- return Status::OK();
- }
-
Status close(FunctionContext* context, FunctionContext::FunctionStateScope
scope) override {
return Status::OK();
}
diff --git a/be/src/vec/io/io_helper.h b/be/src/vec/io/io_helper.h
index 4d8b69d1fd..547277bcc4 100644
--- a/be/src/vec/io/io_helper.h
+++ b/be/src/vec/io/io_helper.h
@@ -21,6 +21,7 @@
#include <snappy/snappy.h>
#include <iostream>
+#include <unordered_map>
#include "common/exception.h"
#include "util/binary_cast.hpp"
@@ -42,7 +43,7 @@ static constexpr size_t DEFAULT_MAX_STRING_SIZE = 1073741824;
// 1GB
static constexpr size_t DEFAULT_MAX_JSON_SIZE = 1073741824; // 1GB
static constexpr auto WRITE_HELPERS_MAX_INT_WIDTH = 40U;
-using ZoneList = std::map<std::string, cctz::time_zone>;
+using ZoneList = std::unordered_map<std::string, cctz::time_zone>;
inline std::string int128_to_string(__int128_t value) {
fmt::memory_buffer buffer;
@@ -337,10 +338,11 @@ bool read_date_v2_text_impl(T& x, ReadBuffer& buf) {
template <typename T>
bool read_date_v2_text_impl(T& x, ReadBuffer& buf, const cctz::time_zone&
local_time_zone,
- ZoneList& time_zone_cache) {
+ ZoneList& time_zone_cache, std::shared_mutex&
cache_lock) {
static_assert(std::is_same_v<UInt32, T>);
auto dv = binary_cast<UInt32, DateV2Value<DateV2ValueType>>(x);
- auto ans = dv.from_date_str(buf.position(), buf.count(), local_time_zone,
time_zone_cache);
+ auto ans = dv.from_date_str(buf.position(), buf.count(), local_time_zone,
time_zone_cache,
+ &cache_lock);
// only to match the is_all_read() check to prevent return null
buf.position() = buf.end();
@@ -362,11 +364,12 @@ bool read_datetime_v2_text_impl(T& x, ReadBuffer& buf,
UInt32 scale = -1) {
template <typename T>
bool read_datetime_v2_text_impl(T& x, ReadBuffer& buf, const cctz::time_zone&
local_time_zone,
- ZoneList& time_zone_cache, UInt32 scale = -1) {
+ ZoneList& time_zone_cache, std::shared_mutex&
cache_lock,
+ UInt32 scale = -1) {
static_assert(std::is_same_v<UInt64, T>);
auto dv = binary_cast<UInt64, DateV2Value<DateTimeV2ValueType>>(x);
- auto ans =
- dv.from_date_str(buf.position(), buf.count(), local_time_zone,
time_zone_cache, scale);
+ auto ans = dv.from_date_str(buf.position(), buf.count(), local_time_zone,
time_zone_cache,
+ &cache_lock, scale);
// only to match the is_all_read() check to prevent return null
buf.position() = buf.end();
@@ -463,13 +466,15 @@ bool try_read_date_text(T& x, ReadBuffer& in, const
cctz::time_zone& local_time_
template <typename T>
bool try_read_date_v2_text(T& x, ReadBuffer& in, const cctz::time_zone&
local_time_zone,
- ZoneList& time_zone_cache) {
- return read_date_v2_text_impl<T>(x, in, local_time_zone, time_zone_cache);
+ ZoneList& time_zone_cache, std::shared_mutex&
cache_lock) {
+ return read_date_v2_text_impl<T>(x, in, local_time_zone, time_zone_cache,
cache_lock);
}
template <typename T>
bool try_read_datetime_v2_text(T& x, ReadBuffer& in, const cctz::time_zone&
local_time_zone,
- ZoneList& time_zone_cache, UInt32 scale) {
- return read_datetime_v2_text_impl<T>(x, in, local_time_zone,
time_zone_cache, scale);
+ ZoneList& time_zone_cache, std::shared_mutex&
cache_lock,
+ UInt32 scale) {
+ return read_datetime_v2_text_impl<T>(x, in, local_time_zone,
time_zone_cache, cache_lock,
+ scale);
}
} // namespace doris::vectorized
diff --git a/be/src/vec/runtime/vdatetime_value.cpp
b/be/src/vec/runtime/vdatetime_value.cpp
index f701e96f30..86d685a0b9 100644
--- a/be/src/vec/runtime/vdatetime_value.cpp
+++ b/be/src/vec/runtime/vdatetime_value.cpp
@@ -85,18 +85,19 @@ bool VecDateTimeValue::check_date(uint32_t year, uint32_t
month, uint32_t day) {
// YYYY-MM-DD HH-MM-DD.FFFFFF AM in default format
// 0 1 2 3 4 5 6 7
bool VecDateTimeValue::from_date_str(const char* date_str, int len) {
- return from_date_str_base(date_str, len, nullptr, nullptr);
+ return from_date_str_base(date_str, len, nullptr, nullptr, nullptr);
}
//parse timezone to get offset
bool VecDateTimeValue::from_date_str(const char* date_str, int len,
const cctz::time_zone& local_time_zone,
- ZoneList& time_zone_cache) {
- return from_date_str_base(date_str, len, &local_time_zone,
&time_zone_cache);
+ ZoneList& time_zone_cache,
std::shared_mutex* cache_lock) {
+ return from_date_str_base(date_str, len, &local_time_zone,
&time_zone_cache, cache_lock);
}
bool VecDateTimeValue::from_date_str_base(const char* date_str, int len,
const cctz::time_zone*
local_time_zone,
- ZoneList* time_zone_cache) {
+ ZoneList* time_zone_cache,
+ std::shared_mutex* cache_lock) {
const char* ptr = date_str;
const char* end = date_str + len;
// ONLY 2, 6 can follow by a space
@@ -165,13 +166,18 @@ bool VecDateTimeValue::from_date_str_base(const char*
date_str, int len,
return false;
}
auto get_tz_offset = [&](const std::string& str_tz,
- const cctz::time_zone* local_time_zone,
- ZoneList* time_zone_cache) -> long {
- // no lock needed because of the entity is of thread_local
+ const cctz::time_zone* local_time_zone)
-> long {
+ cache_lock->lock_shared();
if (time_zone_cache->find(str_tz) == time_zone_cache->end()) {
// not found
+ cache_lock->unlock_shared();
+ std::unique_lock<std::shared_mutex> lock_(*cache_lock);
+ //TODO: the lock upgrade could be done in find_...
function only when we push value into the hashmap
if (!TimezoneUtils::find_cctz_time_zone(str_tz,
(*time_zone_cache)[str_tz])) {
+ time_zone_cache->erase(str_tz);
throw Exception {ErrorCode::INVALID_ARGUMENT, ""};
}
+ } else {
+ cache_lock->unlock_shared();
}
auto given = cctz::convert(cctz::civil_second {},
(*time_zone_cache)[str_tz]);
auto local = cctz::convert(cctz::civil_second {},
*local_time_zone);
@@ -179,8 +185,8 @@ bool VecDateTimeValue::from_date_str_base(const char*
date_str, int len,
return std::chrono::duration_cast<std::chrono::seconds>(given
- local).count();
};
try {
- sec_offset = get_tz_offset(std::string {ptr, end},
local_time_zone,
- time_zone_cache); // use the whole
remain string
+ sec_offset = get_tz_offset(std::string {ptr, end},
+ local_time_zone); // use the whole
remain string
} catch ([[maybe_unused]] Exception& e) {
return false; // invalid format
}
@@ -1951,19 +1957,20 @@ void DateV2Value<T>::format_datetime(uint32_t*
date_val, bool* carry_bits) const
// 0 1 2 3 4 5 6 7
template <typename T>
bool DateV2Value<T>::from_date_str(const char* date_str, int len, int scale /*
= -1*/) {
- return from_date_str_base(date_str, len, scale, nullptr, nullptr);
+ return from_date_str_base(date_str, len, scale, nullptr, nullptr, nullptr);
}
// when we parse
template <typename T>
bool DateV2Value<T>::from_date_str(const char* date_str, int len,
const cctz::time_zone& local_time_zone,
- ZoneList& time_zone_cache, int scale /* =
-1*/) {
- return from_date_str_base(date_str, len, scale, &local_time_zone,
&time_zone_cache);
+ ZoneList& time_zone_cache,
std::shared_mutex* cache_lock,
+ int scale /* = -1*/) {
+ return from_date_str_base(date_str, len, scale, &local_time_zone,
&time_zone_cache, cache_lock);
}
template <typename T>
bool DateV2Value<T>::from_date_str_base(const char* date_str, int len, int
scale,
const cctz::time_zone* local_time_zone,
- ZoneList* time_zone_cache) {
+ ZoneList* time_zone_cache,
std::shared_mutex* cache_lock) {
const char* ptr = date_str;
const char* end = date_str + len;
// ONLY 2, 6 can follow by a space
@@ -2059,13 +2066,18 @@ bool DateV2Value<T>::from_date_str_base(const char*
date_str, int len, int scale
return false;
}
auto get_tz_offset = [&](const std::string& str_tz,
- const cctz::time_zone* local_time_zone,
- ZoneList* time_zone_cache) -> long {
- // no lock needed because of the entity is of thread_local
+ const cctz::time_zone* local_time_zone)
-> long {
+ cache_lock->lock_shared();
if (time_zone_cache->find(str_tz) == time_zone_cache->end()) {
// not found
+ cache_lock->unlock_shared();
+ std::unique_lock<std::shared_mutex> lock_(*cache_lock);
+ //TODO: the lock upgrade could be done in find_...
function only when we push value into the hashmap
if (!TimezoneUtils::find_cctz_time_zone(str_tz,
(*time_zone_cache)[str_tz])) {
+ time_zone_cache->erase(str_tz);
throw Exception {ErrorCode::INVALID_ARGUMENT, ""};
}
+ } else {
+ cache_lock->unlock_shared();
}
auto given = cctz::convert(cctz::civil_second {},
(*time_zone_cache)[str_tz]);
auto local = cctz::convert(cctz::civil_second {},
*local_time_zone);
@@ -2073,8 +2085,8 @@ bool DateV2Value<T>::from_date_str_base(const char*
date_str, int len, int scale
return std::chrono::duration_cast<std::chrono::seconds>(given
- local).count();
};
try {
- sec_offset = get_tz_offset(std::string {ptr, end},
local_time_zone,
- time_zone_cache); // use the whole
remain string
+ sec_offset = get_tz_offset(std::string {ptr, end},
+ local_time_zone); // use the whole
remain string
} catch ([[maybe_unused]] Exception& e) {
return false; // invalid format
}
diff --git a/be/src/vec/runtime/vdatetime_value.h
b/be/src/vec/runtime/vdatetime_value.h
index fa3638fb94..68b1b1ad58 100644
--- a/be/src/vec/runtime/vdatetime_value.h
+++ b/be/src/vec/runtime/vdatetime_value.h
@@ -34,6 +34,7 @@
#include "util/hash_util.hpp"
#include "util/time_lut.h"
#include "util/timezone_utils.h"
+#include "vec/common/hash_table/phmap_fwd_decl.h"
namespace cctz {
class time_zone;
@@ -43,7 +44,7 @@ namespace doris {
namespace vectorized {
-using ZoneList = std::map<std::string, cctz::time_zone>;
+using ZoneList = std::unordered_map<std::string, cctz::time_zone>;
enum TimeUnit {
MICROSECOND,
@@ -354,7 +355,7 @@ public:
// 'YYYYMMDDTHHMMSS'
bool from_date_str(const char* str, int len);
bool from_date_str(const char* str, int len, const cctz::time_zone&
local_time_zone,
- ZoneList& time_zone_cache);
+ ZoneList& time_zone_cache, std::shared_mutex*
cache_lock);
// Construct Date/Datetime type value from int64_t value.
// Return true if convert success. Otherwise return false.
@@ -693,7 +694,7 @@ private:
char* to_time_buffer(char* to) const;
bool from_date_str_base(const char* date_str, int len, const
cctz::time_zone* local_time_zone,
- ZoneList* time_zone_cache);
+ ZoneList* time_zone_cache, std::shared_mutex*
cache_lock);
int64_t to_date_int64() const;
int64_t to_time_int64() const;
@@ -815,7 +816,7 @@ public:
// 'YYYYMMDDTHHMMSS'
bool from_date_str(const char* str, int len, int scale = -1);
bool from_date_str(const char* str, int len, const cctz::time_zone&
local_time_zone,
- ZoneList& time_zone_cache, int scale = -1);
+ ZoneList& time_zone_cache, std::shared_mutex*
cache_lock, int scale = -1);
// Convert this value to string
// this will check type to decide which format to convert
@@ -1179,7 +1180,8 @@ private:
bool disable_lut = false);
bool from_date_str_base(const char* date_str, int len, int scale,
- const cctz::time_zone* local_time_zone, ZoneList*
time_zone_cache);
+ const cctz::time_zone* local_time_zone, ZoneList*
time_zone_cache,
+ std::shared_mutex* cache_lock);
// Used to construct from int value
int64_t standardize_timevalue(int64_t value);
diff --git a/be/test/vec/function/function_time_test.cpp
b/be/test/vec/function/function_time_test.cpp
index a24ab3f35c..e6281bc8f4 100644
--- a/be/test/vec/function/function_time_test.cpp
+++ b/be/test/vec/function/function_time_test.cpp
@@ -539,22 +539,6 @@ TEST(VTimestampFunctionsTest, makedate_test) {
check_function<DataTypeDate, true>(func_name, input_types, data_set);
}
-TEST(VTimestampFunctionsTest, convert_tz_test) {
- TimezoneUtils::load_timezone_names();
- std::string func_name = "convert_tz";
-
- InputTypeSet input_types = {TypeIndex::DateTime, TypeIndex::String,
TypeIndex::String};
-
- DataSet data_set = {
- {{DATETIME("2019-08-01 13:21:03"), STRING("Asia/Shanghai"),
- STRING("America/Los_Angeles")},
- str_to_date_time("2019-07-31 22:21:03", true)},
- {{DATETIME("2019-08-01 13:21:03"), STRING("+08:00"),
STRING("America/Los_Angeles")},
- str_to_date_time("2019-07-31 22:21:03", true)}};
-
- check_function<DataTypeDateTime, true>(func_name, input_types, data_set);
-}
-
TEST(VTimestampFunctionsTest, weekday_test) {
std::string func_name = "weekday";
@@ -1680,20 +1664,4 @@ TEST(VTimestampFunctionsTest, seconds_sub_v2_test) {
}
}
-TEST(VTimestampFunctionsTest, convert_tz_v2_test) {
- TimezoneUtils::load_timezone_names();
- std::string func_name = "convert_tz";
-
- InputTypeSet input_types = {TypeIndex::DateTimeV2, TypeIndex::String,
TypeIndex::String};
-
- DataSet data_set = {
- {{DATETIME("2019-08-01 13:21:03"), STRING("Asia/Shanghai"),
- STRING("America/Los_Angeles")},
- str_to_datetime_v2("2019-07-31 22:21:03", "%Y-%m-%d
%H:%i:%s.%f")},
- {{DATETIME("2019-08-01 13:21:03"), STRING("+08:00"),
STRING("America/Los_Angeles")},
- str_to_datetime_v2("2019-07-31 22:21:03", "%Y-%m-%d
%H:%i:%s.%f")}};
-
- check_function<DataTypeDateTimeV2, true>(func_name, input_types, data_set);
-}
-
} // namespace doris::vectorized
diff --git
a/regression-test/suites/datatype_p0/datetimev2/test_tz_streamload.groovy
b/regression-test/suites/datatype_p0/datetimev2/test_tz_streamload.groovy
index 359f432bd5..6e32facc83 100644
--- a/regression-test/suites/datatype_p0/datetimev2/test_tz_streamload.groovy
+++ b/regression-test/suites/datatype_p0/datetimev2/test_tz_streamload.groovy
@@ -51,7 +51,7 @@ suite("test_tz_streamload") {
streamLoad {
table "${table1}"
set 'column_separator', ','
- set 'time_zone', '+02:00'
+ set 'timezone', '+02:00'
file "test_tz_streamload.csv"
time 20000
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]