This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/tsfile.git
The following commit(s) were added to refs/heads/develop by this push:
new 4909442d Implement datatype TIMESTAMP BLOB TEXT DATE (#532)
4909442d is described below
commit 4909442d90a4181f5957e4cb94438f48d96f16a9
Author: Hongzhi Gao <[email protected]>
AuthorDate: Wed Jul 9 14:14:01 2025 +0800
Implement datatype TIMESTAMP BLOB TEXT DATE (#532)
* Implement datatype TIMESTAMP BLOB TEXT
* format cpp code style
* format code style
* fix compilation error
* fix compilation error
* fix compilation error
* fix memory leak
* format code style
* remove unnecessary changes
---
cpp/src/common/allocator/byte_stream.h | 7 +-
cpp/src/common/container/byte_buffer.h | 6 +-
cpp/src/common/container/murmur_hash3.cc | 73 ++---
cpp/src/common/datatype/date_converter.h | 107 ++++++++
cpp/src/common/datatype/value.h | 3 +
cpp/src/common/db_common.h | 44 +++
cpp/src/common/global.cc | 4 +-
cpp/src/common/record.h | 41 ++-
cpp/src/common/row_record.h | 38 ++-
cpp/src/common/statistic.h | 300 ++++++++++++++++++++-
cpp/src/common/tablet.cc | 45 +++-
cpp/src/common/tsblock/tsblock.cc | 6 +-
cpp/src/common/tsblock/tuple_desc.cc | 10 +-
cpp/src/encoding/decoder_factory.h | 10 +-
cpp/src/encoding/encode_utils.h | 65 ++---
cpp/src/encoding/encoder_factory.h | 12 +-
cpp/src/reader/aligned_chunk_reader.cc | 101 +++++--
cpp/src/reader/aligned_chunk_reader.h | 11 +-
cpp/src/reader/bloom_filter.cc | 6 +-
cpp/src/reader/chunk_reader.cc | 4 +
cpp/src/reader/result_set.h | 16 ++
cpp/src/writer/chunk_writer.h | 39 ++-
cpp/src/writer/page_writer.h | 39 ++-
cpp/src/writer/tsfile_writer.cc | 128 ++++++---
cpp/src/writer/tsfile_writer.h | 11 +-
cpp/src/writer/value_chunk_writer.h | 46 +++-
cpp/src/writer/value_page_writer.h | 47 +++-
cpp/test/common/datatype/date_converter_test.cc | 119 ++++++++
cpp/test/common/record_test.cc | 11 -
cpp/test/common/row_record_test.cc | 15 --
cpp/test/reader/bloom_filter_test.cc | 1 -
.../writer/table_view/tsfile_writer_table_test.cc | 109 +++++++-
cpp/test/writer/tsfile_writer_test.cc | 47 +++-
33 files changed, 1228 insertions(+), 293 deletions(-)
diff --git a/cpp/src/common/allocator/byte_stream.h
b/cpp/src/common/allocator/byte_stream.h
index 47c5148f..6984ecff 100644
--- a/cpp/src/common/allocator/byte_stream.h
+++ b/cpp/src/common/allocator/byte_stream.h
@@ -1004,14 +1004,13 @@ class SerializationUtil {
return common::E_OK;
}
FORCE_INLINE static int write_var_int(int32_t i32, ByteStream &out) {
- // TODO 8byte to 4byte.
- // but in IoTDB java, it has only write_var_uint(i32)
- int ui32 = i32 << 1;
+ uint32_t ui32 = static_cast<uint32_t>(i32) << 1;
if (i32 < 0) {
ui32 = ~ui32;
}
- return do_write_var_uint(static_cast<uint32_t>(ui32), out);
+ return do_write_var_uint(ui32, out);
}
+
FORCE_INLINE static int read_var_int(int32_t &i32, ByteStream &in) {
int ret = common::E_OK;
uint32_t ui32;
diff --git a/cpp/src/common/container/byte_buffer.h
b/cpp/src/common/container/byte_buffer.h
index 967d3640..9e8ecb48 100644
--- a/cpp/src/common/container/byte_buffer.h
+++ b/cpp/src/common/container/byte_buffer.h
@@ -107,9 +107,9 @@ class ByteBuffer {
// for variable len value
FORCE_INLINE char *read(uint32_t offset, uint32_t *len) {
- // get len
- *len = *reinterpret_cast<uint32_t *>(&data_[offset]);
- // get value
+ uint32_t tmp;
+ std::memcpy(&tmp, data_ + offset, sizeof(tmp));
+ *len = tmp;
char *p = &data_[offset + variable_type_len_];
return p;
}
diff --git a/cpp/src/common/container/murmur_hash3.cc
b/cpp/src/common/container/murmur_hash3.cc
index 14a06022..25654233 100644
--- a/cpp/src/common/container/murmur_hash3.cc
+++ b/cpp/src/common/container/murmur_hash3.cc
@@ -22,84 +22,88 @@ namespace common {
/* ================ Murmur128Hash ================ */
// follow Java IoTDB exactly.
-int64_t Murmur128Hash::inner_hash(const char *buf, int32_t len, int64_t seed) {
- const int block_count = len >> 4; // as 128-bit blocks
- int64_t h1 = seed;
- int64_t h2 = seed;
- int64_t c1 = 0x87c37b91114253d5L;
- int64_t c2 = 0x4cf5ad432745937fL;
+int64_t Murmur128Hash::inner_hash(const char* buf, int32_t len, int64_t seed) {
+ const int32_t block_count = len >> 4;
+ uint64_t h1 = static_cast<uint64_t>(seed);
+ uint64_t h2 = static_cast<uint64_t>(seed);
+ const uint64_t c1 = 0x87c37b91114253d5ULL;
+ const uint64_t c2 = 0x4cf5ad432745937fULL;
+
+ // body blocks
+ for (int32_t i = 0; i < block_count; ++i) {
+ uint64_t k1 = get_block(buf, i * 2);
+ uint64_t k2 = get_block(buf, i * 2 + 1);
- for (int i = 0; i < block_count; i++) {
- int64_t k1 = get_block(buf, i * 2);
- int64_t k2 = get_block(buf, i * 2 + 1);
k1 *= c1;
k1 = rotl64(k1, 31);
k1 *= c2;
h1 ^= k1;
h1 = rotl64(h1, 27);
h1 += h2;
- h1 = h1 * 5 + 0x52dce729;
+ h1 = h1 * 5 + 0x52dce729ULL;
+
k2 *= c2;
k2 = rotl64(k2, 33);
k2 *= c1;
h2 ^= k2;
h2 = rotl64(h2, 31);
h2 += h1;
- h2 = h2 * 5 + 0x38495ab5;
+ h2 = h2 * 5 + 0x38495ab5ULL;
}
- int offset = block_count * 16;
- int64_t k1 = 0;
- int64_t k2 = 0;
+ // tail
+ const int32_t offset = block_count * 16;
+ uint64_t k1 = 0;
+ uint64_t k2 = 0;
switch (len & 15) {
case 15:
- k2 ^= ((int64_t)(buf[offset + 14])) << 48;
+ k2 ^= (uint64_t)(uint8_t)buf[offset + 14] << 48;
// fallthrough
case 14:
- k2 ^= ((int64_t)(buf[offset + 13])) << 40;
+ k2 ^= (uint64_t)(uint8_t)buf[offset + 13] << 40;
// fallthrough
case 13:
- k2 ^= ((int64_t)(buf[offset + 12])) << 32;
+ k2 ^= (uint64_t)(uint8_t)buf[offset + 12] << 32;
// fallthrough
case 12:
- k2 ^= ((int64_t)(buf[offset + 11])) << 24;
+ k2 ^= (uint64_t)(uint8_t)buf[offset + 11] << 24;
// fallthrough
case 11:
- k2 ^= ((int64_t)(buf[offset + 10])) << 16;
+ k2 ^= (uint64_t)(uint8_t)buf[offset + 10] << 16;
// fallthrough
case 10:
- k2 ^= ((int64_t)(buf[offset + 9])) << 8;
+ k2 ^= (uint64_t)(uint8_t)buf[offset + 9] << 8;
// fallthrough
case 9:
- k2 ^= buf[offset + 8];
+ k2 ^= (uint64_t)(uint8_t)buf[offset + 8];
k2 *= c2;
k2 = rotl64(k2, 33);
k2 *= c1;
h2 ^= k2;
// fallthrough
case 8:
- k1 ^= ((int64_t)buf[offset + 7]) << 56;
+ k1 ^= (uint64_t)(uint8_t)buf[offset + 7] << 56;
// fallthrough
case 7:
- k1 ^= ((int64_t)buf[offset + 6]) << 48;
+ k1 ^= (uint64_t)(uint8_t)buf[offset + 6] << 48;
// fallthrough
case 6:
- k1 ^= ((int64_t)buf[offset + 5]) << 40;
+ k1 ^= (uint64_t)(uint8_t)buf[offset + 5] << 40;
// fallthrough
case 5:
- k1 ^= ((int64_t)buf[offset + 4]) << 32;
+ k1 ^= (uint64_t)(uint8_t)buf[offset + 4] << 32;
// fallthrough
case 4:
- k1 ^= ((int64_t)buf[offset + 3]) << 24;
+ k1 ^= (uint64_t)(uint8_t)buf[offset + 3] << 24;
// fallthrough
case 3:
- k1 ^= ((int64_t)buf[offset + 2]) << 16;
+ k1 ^= (uint64_t)(uint8_t)buf[offset + 2] << 16;
// fallthrough
case 2:
- k1 ^= ((int64_t)buf[offset + 1]) << 8;
+ k1 ^= (uint64_t)(uint8_t)buf[offset + 1] << 8;
// fallthrough
case 1:
- k1 ^= buf[offset];
+ k1 ^= (uint64_t)(uint8_t)buf[offset];
k1 *= c1;
k1 = rotl64(k1, 31);
k1 *= c2;
@@ -110,18 +114,21 @@ int64_t Murmur128Hash::inner_hash(const char *buf,
int32_t len, int64_t seed) {
break;
}
- h1 ^= len;
- h2 ^= len;
+ // finalization
+ h1 ^= static_cast<uint64_t>(len);
+ h2 ^= static_cast<uint64_t>(len);
h1 += h2;
h2 += h1;
+
h1 = fmix(h1);
h2 = fmix(h2);
+
h1 += h2;
h2 += h1;
- return h1 + h2;
+ return static_cast<int64_t>(h1 + h2);
}
-int64_t Murmur128Hash::get_block(const char *buf, int32_t index) {
+int64_t Murmur128Hash::get_block(const char* buf, int32_t index) {
int block_offset = index << 3;
int64_t res = 0;
res += ((int64_t)(buf[block_offset + 0] & 0xFF));
diff --git a/cpp/src/common/datatype/date_converter.h
b/cpp/src/common/datatype/date_converter.h
new file mode 100644
index 00000000..3234c775
--- /dev/null
+++ b/cpp/src/common/datatype/date_converter.h
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef COMMON_DATATYPE_DATE_CONVERTER_H
+#define COMMON_DATATYPE_DATE_CONVERTER_H
+
+#include <cstdint>
+#include <ctime>
+
+#include "utils/errno_define.h"
+
+namespace common {
+class DateConverter {
+ public:
+ static int date_to_int(const std::tm& tm_date, int32_t& out_int) {
+ if (tm_date.tm_year == -1 || tm_date.tm_mon == -1 ||
+ tm_date.tm_mday == -1) {
+ return E_INVALID_ARG;
+ }
+
+ const int year = tm_date.tm_year + 1900;
+ const int month = tm_date.tm_mon + 1;
+ const int day = tm_date.tm_mday;
+
+ if (year < 1000 || year > 9999 || month < 1 || month > 12 || day < 1 ||
+ day > 31) {
+ return E_INVALID_ARG;
+ }
+
+ std::tm tmp = tm_date;
+ tmp.tm_hour = 12;
+ tmp.tm_isdst = -1;
+ if (std::mktime(&tmp) == -1) {
+ return E_INVALID_ARG;
+ }
+
+ if (tmp.tm_year != tm_date.tm_year || tmp.tm_mon != tm_date.tm_mon ||
+ tmp.tm_mday != tm_date.tm_mday) {
+ return E_INVALID_ARG;
+ }
+
+ const int64_t result =
+ static_cast<int64_t>(year) * 10000 + month * 100 + day;
+ if (result > INT32_MAX || result < INT32_MIN) {
+ return E_OUT_OF_RANGE;
+ }
+
+ out_int = static_cast<int32_t>(result);
+ return E_OK;
+ }
+
+ static bool is_tm_ymd_equal(const std::tm& tm1, const std::tm& tm2) {
+ return tm1.tm_year == tm2.tm_year && tm1.tm_mon == tm2.tm_mon &&
+ tm1.tm_mday == tm2.tm_mday;
+ }
+
+ static int int_to_date(int32_t date_int, std::tm& out_tm) {
+ if (date_int == 0) {
+ out_tm.tm_year = out_tm.tm_mon = out_tm.tm_mday = -1;
+ return E_INVALID_ARG;
+ }
+
+ int year = date_int / 10000;
+ int month = (date_int % 10000) / 100;
+ int day = date_int % 100;
+
+ if (year < 1000 || year > 9999 || month < 1 || month > 12 || day < 1 ||
+ day > 31) {
+ return E_INVALID_ARG;
+ }
+
+ out_tm = {0};
+ out_tm.tm_year = year - 1900;
+ out_tm.tm_mon = month - 1;
+ out_tm.tm_mday = day;
+ out_tm.tm_hour = 12;
+ out_tm.tm_isdst = -1;
+
+ if (std::mktime(&out_tm) == -1) {
+ return E_INVALID_ARG;
+ }
+ if (out_tm.tm_year != year - 1900 || out_tm.tm_mon != month - 1 ||
+ out_tm.tm_mday != day) {
+ return E_INVALID_ARG;
+ }
+
+ return E_OK;
+ }
+};
+} // namespace common
+#endif // COMMON_DATATYPE_DATE_CONVERTER_H
diff --git a/cpp/src/common/datatype/value.h b/cpp/src/common/datatype/value.h
index 29fd5706..8fb77a1e 100644
--- a/cpp/src/common/datatype/value.h
+++ b/cpp/src/common/datatype/value.h
@@ -88,6 +88,7 @@ struct Value {
value_.bval_ = *(bool *)val;
break;
}
+ case common::DATE:
case common::INT32: {
value_.ival_ = *(int32_t *)val;
break;
@@ -104,6 +105,8 @@ struct Value {
value_.dval_ = *(double *)val;
break;
}
+ case common::BLOB:
+ case common::STRING:
case common::TEXT: {
value_.sval_ = strdup((const char *)val);
break;
diff --git a/cpp/src/common/db_common.h b/cpp/src/common/db_common.h
index a9573374..8d6465c6 100644
--- a/cpp/src/common/db_common.h
+++ b/cpp/src/common/db_common.h
@@ -21,6 +21,7 @@
#define COMMON_DB_COMMON_H
#include <iostream>
+#include <unordered_set>
#include "common/allocator/my_string.h"
#include "utils/util_define.h"
@@ -41,6 +42,10 @@ enum TSDataType : uint8_t {
DOUBLE = 4,
TEXT = 5,
VECTOR = 6,
+ UNKNOWN = 7,
+ TIMESTAMP = 8,
+ DATE = 9,
+ BLOB = 10,
STRING = 11,
NULL_TYPE = 254,
INVALID_DATATYPE = 255
@@ -135,14 +140,53 @@ FORCE_INLINE common::TSDataType
GetDataTypeFromTemplateType<common::String>() {
return common::STRING;
}
+template <typename T>
+FORCE_INLINE std::unordered_set<common::TSDataType>
+GetDataTypesFromTemplateType() {
+ return {common::INVALID_DATATYPE};
+}
+
+template <>
+FORCE_INLINE std::unordered_set<common::TSDataType>
+GetDataTypesFromTemplateType<bool>() {
+ return {common::BOOLEAN};
+}
+template <>
+FORCE_INLINE std::unordered_set<common::TSDataType>
+GetDataTypesFromTemplateType<int32_t>() {
+ return {common::INT32, common::DATE, common::INT64};
+}
+template <>
+FORCE_INLINE std::unordered_set<common::TSDataType>
+GetDataTypesFromTemplateType<int64_t>() {
+ return {common::INT64, TIMESTAMP};
+}
+template <>
+FORCE_INLINE std::unordered_set<common::TSDataType>
+GetDataTypesFromTemplateType<float>() {
+ return {common::FLOAT, common::DOUBLE};
+}
+template <>
+FORCE_INLINE std::unordered_set<common::TSDataType>
+GetDataTypesFromTemplateType<double>() {
+ return {common::DOUBLE};
+}
+template <>
+FORCE_INLINE std::unordered_set<common::TSDataType>
+GetDataTypesFromTemplateType<common::String>() {
+ return {common::STRING, common::TEXT, common::BLOB};
+}
+
FORCE_INLINE size_t get_data_type_size(TSDataType data_type) {
switch (data_type) {
case common::BOOLEAN:
return 1;
+ case common::DATE:
case common::INT32:
case common::FLOAT:
return 4;
case common::INT64:
+ case common::TIMESTAMP:
case common::DOUBLE:
return 8;
default:
diff --git a/cpp/src/common/global.cc b/cpp/src/common/global.cc
index 32b93712..e2004037 100644
--- a/cpp/src/common/global.cc
+++ b/cpp/src/common/global.cc
@@ -59,16 +59,18 @@ extern TSEncoding get_value_encoder(TSDataType data_type) {
case BOOLEAN:
return g_config_value_.boolean_encoding_type_;
case INT32:
+ case DATE:
return g_config_value_.int32_encoding_type_;
case INT64:
+ case TIMESTAMP:
return g_config_value_.int64_encoding_type_;
case FLOAT:
return g_config_value_.float_encoding_type_;
case DOUBLE:
return g_config_value_.double_encoding_type_;
case TEXT:
- return g_config_value_.string_encoding_type_;
case STRING:
+ case BLOB:
return g_config_value_.string_encoding_type_;
case VECTOR:
break;
diff --git a/cpp/src/common/record.h b/cpp/src/common/record.h
index eed72e1f..370508a3 100644
--- a/cpp/src/common/record.h
+++ b/cpp/src/common/record.h
@@ -24,6 +24,7 @@
#include <vector>
#include "common/allocator/my_string.h"
+#include "common/datatype/date_converter.h"
#include "common/db_common.h"
#include "utils/errno_define.h"
@@ -46,7 +47,6 @@ struct TextType {
struct DataPoint {
bool isnull = false;
std::string measurement_name_;
- common::TSDataType data_type_;
union {
bool bool_val_;
int32_t i32_val_;
@@ -58,45 +58,33 @@ struct DataPoint {
TextType text_val_;
DataPoint(const std::string &measurement_name, bool b)
- : measurement_name_(measurement_name),
- data_type_(common::BOOLEAN),
- text_val_() {
+ : measurement_name_(measurement_name), text_val_() {
u_.bool_val_ = b;
}
DataPoint(const std::string &measurement_name, int32_t i32)
- : measurement_name_(measurement_name),
- data_type_(common::INT32),
- text_val_() {
+ : measurement_name_(measurement_name), text_val_() {
u_.i32_val_ = i32;
}
DataPoint(const std::string &measurement_name, int64_t i64)
- : measurement_name_(measurement_name),
- data_type_(common::INT64),
- text_val_() {
+ : measurement_name_(measurement_name), text_val_() {
u_.i64_val_ = i64;
}
DataPoint(const std::string &measurement_name, float f)
- : measurement_name_(measurement_name),
- data_type_(common::FLOAT),
- text_val_() {
+ : measurement_name_(measurement_name), text_val_() {
u_.float_val_ = f;
}
DataPoint(const std::string &measurement_name, double d)
- : measurement_name_(measurement_name),
- data_type_(common::DOUBLE),
- text_val_() {
+ : measurement_name_(measurement_name), text_val_() {
u_.double_val_ = d;
}
DataPoint(const std::string &measurement_name, common::String &str,
common::PageArena &pa)
- : measurement_name_(measurement_name),
- data_type_(common::STRING),
- text_val_() {
+ : measurement_name_(measurement_name), text_val_() {
char *p_buf = (char *)pa.alloc(sizeof(common::String));
u_.str_val_ = new (p_buf) common::String();
u_.str_val_->dup_from(str, pa);
@@ -110,22 +98,18 @@ struct DataPoint {
DataPoint(const std::string &measurement_name)
: isnull(true), measurement_name_(measurement_name) {}
void set_i32(int32_t i32) {
- data_type_ = common::INT32;
u_.i32_val_ = i32;
isnull = false;
}
void set_i64(int64_t i64) {
- data_type_ = common::INT64;
u_.i64_val_ = i64;
isnull = false;
}
void set_float(float f) {
- data_type_ = common::FLOAT;
u_.float_val_ = f;
isnull = false;
}
void set_double(double d) {
- data_type_ = common::DOUBLE;
u_.double_val_ = d;
isnull = false;
}
@@ -165,5 +149,16 @@ inline int TsRecord::add_point(const std::string
&measurement_name,
return ret;
}
+template <>
+inline int TsRecord::add_point(const std::string &measurement_name,
+ std::tm val) {
+ int ret = common::E_OK;
+ int data_int;
+ if (RET_SUCC(common::DateConverter::date_to_int(val, data_int))) {
+ points_.emplace_back(DataPoint(measurement_name, data_int));
+ }
+ return ret;
+}
+
} // end namespace storage
#endif // COMMON_RECORD_H
diff --git a/cpp/src/common/row_record.h b/cpp/src/common/row_record.h
index 90a1405a..5ff5e232 100644
--- a/cpp/src/common/row_record.h
+++ b/cpp/src/common/row_record.h
@@ -23,6 +23,7 @@
#include <vector>
#include "common/allocator/my_string.h"
+#include "common/datatype/date_converter.h"
#include "common/db_common.h"
namespace storage {
@@ -33,13 +34,12 @@ struct Field {
~Field() { free_memory(); }
FORCE_INLINE void free_memory() {
- if (type_ == common::TEXT && value_.sval_ != nullptr) {
- free(value_.sval_);
- value_.sval_ = nullptr;
- }
- if (type_ == common::STRING && value_.strval_ != nullptr) {
- delete value_.strval_;
- value_.strval_ = nullptr;
+ if (type_ == common::BLOB || type_ == common::TEXT ||
+ type_ == common::STRING) {
+ if (value_.strval_ != nullptr) {
+ delete value_.strval_;
+ value_.strval_ = nullptr;
+ }
}
}
@@ -65,10 +65,12 @@ struct Field {
value_.bval_ = *(bool *)val;
break;
}
+ case common::DATE:
case common::INT32: {
value_.ival_ = *(int32_t *)val;
break;
}
+ case common::TIMESTAMP:
case common::INT64: {
value_.lval_ = *(int64_t *)val;
break;
@@ -81,16 +83,14 @@ struct Field {
value_.dval_ = *(double *)val;
break;
}
+ case common::TEXT:
+ case common::BLOB:
case common::STRING: {
value_.strval_ = new common::String();
value_.strval_->dup_from(
std::string(static_cast<char *>(val), len), pa);
break;
}
- // case common::TEXT: {
- // value_.sval_ = strdup(val);
- // break;
- // }
default: {
assert(false);
std::cout << "unknown data type" << std::endl;
@@ -105,14 +105,13 @@ struct Field {
return value_.bval_;
case common::TSDataType::INT32:
return value_.ival_;
+ case common::TSDataType::TIMESTAMP:
case common::TSDataType::INT64:
return value_.lval_;
case common::TSDataType::FLOAT:
return value_.fval_;
case common::TSDataType::DOUBLE:
return value_.dval_;
- // case common::TSDataType::TEXT :
- // return value_.sval_;
default:
std::cout << "unknown data type" << std::endl;
break;
@@ -120,8 +119,18 @@ struct Field {
return -1; // when data type is unknown
}
+ FORCE_INLINE std::tm get_date_value() {
+ std::tm date_value{};
+ if (type_ == common::DATE) {
+ common::DateConverter::int_to_date(value_.ival_, date_value);
+ return date_value;
+ }
+ return date_value;
+ }
+
FORCE_INLINE common::String *get_string_value() {
- if (type_ == common::STRING) {
+ if (type_ == common::STRING || type_ == common::TEXT ||
+ type_ == common::BLOB) {
return value_.strval_;
} else {
return nullptr;
@@ -209,6 +218,7 @@ class RowRecord {
FORCE_INLINE void reset() {
for (uint32_t i = 0; i < col_num_; ++i) {
if ((*fields_)[i]->type_ == common::TEXT ||
+ (*fields_)[i]->type_ == common::BLOB ||
(*fields_)[i]->type_ == common::STRING) {
(*fields_)[i]->free_memory();
}
diff --git a/cpp/src/common/statistic.h b/cpp/src/common/statistic.h
index 9b8cf113..d4d31b96 100644
--- a/cpp/src/common/statistic.h
+++ b/cpp/src/common/statistic.h
@@ -96,6 +96,16 @@ namespace storage {
} \
} while (false)
+#define TEXT_VALUE_STAT_UPDATE(value) \
+ do { \
+ if (UNLIKELY(count_ == 0)) { \
+ first_value_.dup_from(value, *pa_); \
+ last_value_.dup_from(value, *pa_); \
+ } else { \
+ last_value_.dup_from(value, *pa_); \
+ } \
+ } while (false)
+
#define NUM_STAT_UPDATE(time, value) \
do { \
/* update time */ \
@@ -114,6 +124,22 @@ namespace storage {
count_++; \
} while (false)
+#define TEXT_STAT_UPDATE(time, value) \
+ do { \
+ /* update time */ \
+ TIME_STAT_UPDATE((time)); \
+ /* update string value */ \
+ TEXT_VALUE_STAT_UPDATE((value)); \
+ count_++; \
+ } while (false)
+
+#define BLOB_STAT_UPDATE(time, value) \
+ do { \
+ /* update time */ \
+ TIME_STAT_UPDATE((time)); \
+ count_++; \
+ } while (false)
+
#define BOOL_STAT_UPDATE(time, value) \
do { \
/* update time */ \
@@ -316,6 +342,66 @@ class Statistic {
return common::E_OK; \
} while (false)
+#define MERGE_TEXT_STAT_FROM(StatType, untyped_stat) \
+ do { \
+ if (UNLIKELY(untyped_stat == nullptr)) { \
+ return common::E_INVALID_ARG; \
+ } \
+ StatType *typed_stat = (StatType *)(untyped_stat); \
+ if (UNLIKELY(typed_stat == nullptr)) { \
+ return common::E_TYPE_NOT_MATCH; \
+ } \
+ if (UNLIKELY(typed_stat->count_ == 0)) { \
+ return common::E_OK; \
+ } \
+ if (count_ == 0) { \
+ count_ = typed_stat->count_; \
+ start_time_ = typed_stat->start_time_; \
+ end_time_ = typed_stat->end_time_; \
+ first_value_.dup_from(typed_stat->first_value_, *pa_); \
+ last_value_.dup_from(typed_stat->last_value_, *pa_); \
+ } else { \
+ count_ += typed_stat->count_; \
+ if (typed_stat->start_time_ < start_time_) { \
+ start_time_ = typed_stat->start_time_; \
+ first_value_.dup_from(typed_stat->first_value_, *pa_); \
+ } \
+ if (typed_stat->end_time_ > end_time_) { \
+ end_time_ = typed_stat->end_time_; \
+ last_value_.dup_from(typed_stat->last_value_, *pa_); \
+ } \
+ } \
+ return common::E_OK; \
+ } while (false)
+
+#define MERGE_BLOB_STAT_FROM(StatType, untyped_stat) \
+ do { \
+ if (UNLIKELY(untyped_stat == nullptr)) { \
+ return common::E_INVALID_ARG; \
+ } \
+ StatType *typed_stat = (StatType *)(untyped_stat); \
+ if (UNLIKELY(typed_stat == nullptr)) { \
+ return common::E_TYPE_NOT_MATCH; \
+ } \
+ if (UNLIKELY(typed_stat->count_ == 0)) { \
+ return common::E_OK; \
+ } \
+ if (count_ == 0) { \
+ count_ = typed_stat->count_; \
+ start_time_ = typed_stat->start_time_; \
+ end_time_ = typed_stat->end_time_; \
+ } else { \
+ count_ += typed_stat->count_; \
+ if (typed_stat->start_time_ < start_time_) { \
+ start_time_ = typed_stat->start_time_; \
+ } \
+ if (typed_stat->end_time_ > end_time_) { \
+ end_time_ = typed_stat->end_time_; \
+ } \
+ } \
+ return common::E_OK; \
+ } while (false)
+
#define MERGE_TIME_STAT_FROM(StatType, untyped_stat) \
do { \
if (UNLIKELY(untyped_stat == nullptr)) { \
@@ -401,6 +487,38 @@ class Statistic {
return common::E_OK; \
} while (false)
+#define DEEP_COPY_TEXT_STAT_FROM(StatType, untyped_stat) \
+ do { \
+ if (UNLIKELY(untyped_stat == nullptr)) { \
+ return common::E_INVALID_ARG; \
+ } \
+ StatType *typed_stat = (StatType *)(untyped_stat); \
+ if (UNLIKELY(typed_stat == nullptr)) { \
+ return common::E_TYPE_NOT_MATCH; \
+ } \
+ count_ = typed_stat->count_; \
+ start_time_ = typed_stat->start_time_; \
+ end_time_ = typed_stat->end_time_; \
+ first_value_.dup_from(typed_stat->first_value_, *pa_); \
+ last_value_.dup_from(typed_stat->last_value_, *pa_); \
+ return common::E_OK; \
+ } while (false)
+
+#define DEEP_COPY_BLOB_STAT_FROM(StatType, untyped_stat) \
+ do { \
+ if (UNLIKELY(untyped_stat == nullptr)) { \
+ return common::E_INVALID_ARG; \
+ } \
+ StatType *typed_stat = (StatType *)(untyped_stat); \
+ if (UNLIKELY(typed_stat == nullptr)) { \
+ return common::E_TYPE_NOT_MATCH; \
+ } \
+ count_ = typed_stat->count_; \
+ start_time_ = typed_stat->start_time_; \
+ end_time_ = typed_stat->end_time_; \
+ return common::E_OK; \
+ } while (false)
+
#define DEEP_COPY_TIME_STAT_FROM(StatType, untyped_stat) \
do { \
if (UNLIKELY(untyped_stat == nullptr)) { \
@@ -577,6 +695,10 @@ class Int32Statistic : public Statistic {
}
};
+class DateStatistic : public Int32Statistic {
+ FORCE_INLINE common::TSDataType get_type() { return common::DATE; }
+};
+
class Int64Statistic : public Statistic {
public:
double sum_value_;
@@ -869,6 +991,10 @@ class TimeStatistic : public Statistic {
}
};
+class TimestampStatistics : public Int64Statistic {
+ FORCE_INLINE common::TSDataType get_type() { return common::TIMESTAMP; }
+};
+
class StringStatistic : public Statistic {
public:
common::String min_value_;
@@ -955,12 +1081,134 @@ class StringStatistic : public Statistic {
common::PageArena *pa_;
};
+class TextStatistic : public Statistic {
+ public:
+ common::String first_value_;
+ common::String last_value_;
+ TextStatistic() : first_value_(), last_value_() {
+ pa_ = new common::PageArena();
+ pa_->init(512, common::MOD_STATISTIC_OBJ);
+ }
+
+ TextStatistic(common::PageArena *pa)
+ : first_value_(), last_value_(), pa_(pa) {}
+
+ ~TextStatistic() { destroy(); }
+
+ void destroy() {
+ if (pa_) {
+ delete pa_;
+ pa_ = nullptr;
+ }
+ }
+
+ FORCE_INLINE void reset() {
+ count_ = 0;
+ start_time_ = 0;
+ end_time_ = 0;
+ first_value_ = common::String();
+ last_value_ = common::String();
+ }
+ void clone_from(const TextStatistic &that) {
+ count_ = that.count_;
+ start_time_ = that.start_time_;
+ end_time_ = that.end_time_;
+
+ first_value_.dup_from(that.first_value_, *pa_);
+ last_value_.dup_from(that.last_value_, *pa_);
+ }
+
+ FORCE_INLINE void update(int64_t time, common::String value) {
+ TEXT_STAT_UPDATE(time, value);
+ }
+
+ FORCE_INLINE common::TSDataType get_type() { return common::TEXT; }
+
+ int serialize_typed_stat(common::ByteStream &out) {
+ int ret = common::E_OK;
+ if (RET_FAIL(common::SerializationUtil::write_str(first_value_, out)))
{
+ } else if (RET_FAIL(common::SerializationUtil::write_str(last_value_,
+ out))) {
+ }
+ return ret;
+ }
+ int deserialize_typed_stat(common::ByteStream &in) {
+ int ret = common::E_OK;
+ if (RET_FAIL(
+ common::SerializationUtil::read_str(first_value_, pa_, in))) {
+ } else if (RET_FAIL(common::SerializationUtil::read_str(last_value_,
+ pa_, in))) {
+ }
+ return ret;
+ }
+ int merge_with(Statistic *stat) {
+ MERGE_TEXT_STAT_FROM(TextStatistic, stat);
+ }
+ int deep_copy_from(Statistic *stat) {
+ DEEP_COPY_TEXT_STAT_FROM(TextStatistic, stat);
+ }
+
+ private:
+ common::PageArena *pa_;
+};
+
+class BlobStatistic : public Statistic {
+ public:
+ BlobStatistic() {
+ pa_ = new common::PageArena();
+ pa_->init(512, common::MOD_STATISTIC_OBJ);
+ }
+
+ BlobStatistic(common::PageArena *pa) {}
+
+ ~BlobStatistic() { destroy(); }
+
+ void destroy() {
+ if (pa_) {
+ delete pa_;
+ pa_ = nullptr;
+ }
+ }
+
+ FORCE_INLINE void reset() {
+ count_ = 0;
+ start_time_ = 0;
+ end_time_ = 0;
+ }
+ void clone_from(const BlobStatistic &that) {
+ count_ = that.count_;
+ start_time_ = that.start_time_;
+ end_time_ = that.end_time_;
+ }
+
+ FORCE_INLINE void update(int64_t time, common::String value) {
+ BLOB_STAT_UPDATE(time, value);
+ }
+
+ FORCE_INLINE common::TSDataType get_type() { return common::BLOB; }
+
+ int serialize_typed_stat(common::ByteStream &out) { return common::E_OK; }
+ int deserialize_typed_stat(common::ByteStream &in) { return common::E_OK; }
+ int merge_with(Statistic *stat) {
+ MERGE_BLOB_STAT_FROM(BlobStatistic, stat);
+ }
+ int deep_copy_from(Statistic *stat) {
+ DEEP_COPY_BLOB_STAT_FROM(BlobStatistic, stat);
+ }
+
+ private:
+ common::PageArena *pa_;
+};
+
FORCE_INLINE uint32_t get_typed_statistic_sizeof(common::TSDataType type) {
uint32_t ret_size = 0;
switch (type) {
case common::BOOLEAN:
ret_size = sizeof(BooleanStatistic);
break;
+ case common::DATE:
+ ret_size = sizeof(DateStatistic);
+ break;
case common::INT32:
ret_size = sizeof(Int32Statistic);
break;
@@ -977,7 +1225,13 @@ FORCE_INLINE uint32_t
get_typed_statistic_sizeof(common::TSDataType type) {
ret_size = sizeof(StringStatistic);
break;
case common::TEXT:
- ASSERT(false);
+ ret_size = sizeof(TextStatistic);
+ break;
+ case common::BLOB:
+ ret_size = sizeof(BlobStatistic);
+ break;
+ case common::TIMESTAMP:
+ ret_size = sizeof(TimestampStatistics);
break;
case common::VECTOR:
ret_size = sizeof(TimeStatistic);
@@ -996,6 +1250,9 @@ FORCE_INLINE Statistic
*placement_new_statistic(common::TSDataType type,
case common::BOOLEAN:
s = new (buf) BooleanStatistic;
break;
+ case common::DATE:
+ s = new (buf) DateStatistic;
+ break;
case common::INT32:
s = new (buf) Int32Statistic;
break;
@@ -1012,7 +1269,13 @@ FORCE_INLINE Statistic
*placement_new_statistic(common::TSDataType type,
s = new (buf) StringStatistic;
break;
case common::TEXT:
- ASSERT(false);
+ s = new (buf) TextStatistic;
+ break;
+ case common::BLOB:
+ s = new (buf) BlobStatistic;
+ break;
+ case common::TIMESTAMP:
+ s = new (buf) TimestampStatistics;
break;
case common::VECTOR:
s = new (buf) TimeStatistic;
@@ -1041,6 +1304,9 @@ FORCE_INLINE void clone_statistic(Statistic *from,
Statistic *to,
case common::BOOLEAN:
TYPED_CLONE_STATISTIC(BooleanStatistic);
break;
+ case common::DATE:
+ TYPED_CLONE_STATISTIC(DateStatistic);
+ break;
case common::INT32:
TYPED_CLONE_STATISTIC(Int32Statistic);
break;
@@ -1057,7 +1323,13 @@ FORCE_INLINE void clone_statistic(Statistic *from,
Statistic *to,
TYPED_CLONE_STATISTIC(StringStatistic);
break;
case common::TEXT:
- ASSERT(false);
+ TYPED_CLONE_STATISTIC(TextStatistic);
+ break;
+ case common::BLOB:
+ TYPED_CLONE_STATISTIC(BlobStatistic);
+ break;
+ case common::TIMESTAMP:
+ TYPED_CLONE_STATISTIC(TimestampStatistics);
break;
case common::VECTOR:
TYPED_CLONE_STATISTIC(TimeStatistic);
@@ -1100,6 +1372,9 @@ class StatisticFactory {
case common::BOOLEAN:
ALLOC_STATISTIC(BooleanStatistic);
break;
+ case common::DATE:
+ ALLOC_STATISTIC(DateStatistic);
+ break;
case common::INT32:
ALLOC_STATISTIC(Int32Statistic);
break;
@@ -1116,7 +1391,13 @@ class StatisticFactory {
ALLOC_STATISTIC(StringStatistic);
break;
case common::TEXT:
- ASSERT(false);
+ ALLOC_STATISTIC(TextStatistic);
+ break;
+ case common::BLOB:
+ ALLOC_STATISTIC(BlobStatistic);
+ break;
+ case common::TIMESTAMP:
+ ALLOC_STATISTIC(TimestampStatistics);
break;
case common::VECTOR:
ALLOC_STATISTIC(TimeStatistic);
@@ -1151,11 +1432,20 @@ class StatisticFactory {
ALLOC_HEAP_STATISTIC_WITH_PA(StringStatistic);
break;
case common::TEXT:
- ASSERT(false);
+ ALLOC_HEAP_STATISTIC_WITH_PA(TextStatistic);
+ break;
+ case common::BLOB:
+ ALLOC_HEAP_STATISTIC_WITH_PA(BlobStatistic);
+ break;
+ case common::TIMESTAMP:
+ ALLOC_STATISTIC_WITH_PA(TimestampStatistics);
break;
case common::VECTOR:
ALLOC_STATISTIC_WITH_PA(TimeStatistic);
break;
+ case common::DATE:
+ ALLOC_STATISTIC_WITH_PA(DateStatistic);
+ break;
default:
ASSERT(false);
}
diff --git a/cpp/src/common/tablet.cc b/cpp/src/common/tablet.cc
index 2b4fd37e..26be1336 100644
--- a/cpp/src/common/tablet.cc
+++ b/cpp/src/common/tablet.cc
@@ -21,6 +21,7 @@
#include <cstdlib>
+#include "datatype/date_converter.h"
#include "utils/errno_define.h"
using namespace common;
@@ -53,10 +54,12 @@ int Tablet::init() {
value_matrix_[c].bool_data = (bool *)malloc(
get_data_type_size(schema.data_type_) * max_row_num_);
break;
+ case DATE:
case INT32:
value_matrix_[c].int32_data = (int32_t *)malloc(
get_data_type_size(schema.data_type_) * max_row_num_);
break;
+ case TIMESTAMP:
case INT64:
value_matrix_[c].int64_data = (int64_t *)malloc(
get_data_type_size(schema.data_type_) * max_row_num_);
@@ -69,6 +72,8 @@ int Tablet::init() {
value_matrix_[c].double_data = (double *)malloc(
get_data_type_size(schema.data_type_) * max_row_num_);
break;
+ case BLOB:
+ case TEXT:
case STRING: {
value_matrix_[c].string_data =
(common::String *)malloc(sizeof(String) * max_row_num_);
@@ -97,9 +102,11 @@ void Tablet::destroy() {
for (size_t c = 0; c < schema_vec_->size(); c++) {
const MeasurementSchema &schema = schema_vec_->at(c);
switch (schema.data_type_) {
+ case DATE:
case INT32:
free(value_matrix_[c].int32_data);
break;
+ case TIMESTAMP:
case INT64:
free(value_matrix_[c].int64_data);
break;
@@ -112,6 +119,8 @@ void Tablet::destroy() {
case BOOLEAN:
free(value_matrix_[c].bool_data);
break;
+ case BLOB:
+ case TEXT:
case STRING:
free(value_matrix_[c].string_data);
break;
@@ -201,10 +210,12 @@ void Tablet::process_val(uint32_t row_index, uint32_t
schema_index, T val) {
(value_matrix_[schema_index].bool_data)[row_index] =
static_cast<bool>(val);
break;
+ case common::DATE:
case common::INT32:
value_matrix_[schema_index].int32_data[row_index] =
static_cast<int32_t>(val);
break;
+ case common::TIMESTAMP:
case common::INT64:
value_matrix_[schema_index].int64_data[row_index] =
static_cast<int64_t>(val);
@@ -234,20 +245,28 @@ int Tablet::add_value(uint32_t row_index, uint32_t
schema_index, T val) {
ret = common::E_OUT_OF_RANGE;
} else {
const MeasurementSchema &schema = schema_vec_->at(schema_index);
- if (UNLIKELY(GetDataTypeFromTemplateType<T>() != schema.data_type_)) {
- if (GetDataTypeFromTemplateType<T>() == common::INT32 &&
- schema.data_type_ == common::INT64) {
- process_val(row_index, schema_index,
static_cast<int64_t>(val));
- } else if (GetDataTypeFromTemplateType<T>() == common::FLOAT &&
- schema.data_type_ == common::DOUBLE) {
- process_val(row_index, schema_index, static_cast<double>(val));
- } else {
- ASSERT(false);
- return E_TYPE_NOT_MATCH;
- }
- } else {
- process_val(row_index, schema_index, val);
+ auto dic = GetDataTypesFromTemplateType<T>();
+ if (!GetDataTypesFromTemplateType<T>().count(schema.data_type_)) {
+ return E_TYPE_NOT_MATCH;
}
+ process_val(row_index, schema_index, val);
+ }
+ return ret;
+}
+
+template <>
+int Tablet::add_value(uint32_t row_index, uint32_t schema_index, std::tm val) {
+ if (err_code_ != E_OK) {
+ return err_code_;
+ }
+ int ret = common::E_OK;
+ if (UNLIKELY(schema_index >= schema_vec_->size())) {
+ ASSERT(false);
+ ret = common::E_OUT_OF_RANGE;
+ }
+ int32_t date_int;
+ if (RET_SUCC(common::DateConverter::date_to_int(val, date_int))) {
+ process_val(row_index, schema_index, date_int);
}
return ret;
}
diff --git a/cpp/src/common/tsblock/tsblock.cc
b/cpp/src/common/tsblock/tsblock.cc
index 4855ea37..c6a675c3 100644
--- a/cpp/src/common/tsblock/tsblock.cc
+++ b/cpp/src/common/tsblock/tsblock.cc
@@ -51,9 +51,11 @@ int TsBlock::init() {
int TsBlock::build_vector(common::TSDataType type, uint32_t row_count) {
Vector *vec;
int ret = 0;
- if (LIKELY(type != common::TEXT && type != common::STRING)) {
+ if (LIKELY(type != common::TEXT && type != common::STRING &&
+ type != common::BLOB)) {
vec = new FixedLengthVector(type, row_count, get_len(type), this);
- } else if (type == common::TEXT || type == common::STRING) {
+ } else if (type == common::TEXT || type == common::STRING ||
+ type == common::BLOB) {
vec = new VariableLengthVector(
type, row_count, DEFAULT_RESERVED_SIZE_OF_TEXT + TEXT_LEN, this);
} else {
diff --git a/cpp/src/common/tsblock/tuple_desc.cc
b/cpp/src/common/tsblock/tuple_desc.cc
index be0abf71..f0550eb1 100644
--- a/cpp/src/common/tsblock/tuple_desc.cc
+++ b/cpp/src/common/tsblock/tuple_desc.cc
@@ -28,10 +28,12 @@ uint32_t TupleDesc::get_single_row_len(int *erro_code) {
total_len += sizeof(bool);
break;
}
+ case common::DATE:
case common::INT32: {
total_len += sizeof(int32_t);
break;
}
+ case common::TIMESTAMP:
case common::INT64: {
total_len += sizeof(int64_t);
break;
@@ -44,14 +46,12 @@ uint32_t TupleDesc::get_single_row_len(int *erro_code) {
total_len += sizeof(double);
break;
}
+ case common::TEXT:
+ case common::BLOB:
case common::STRING: {
total_len += DEFAULT_RESERVED_SIZE_OF_STRING + STRING_LEN;
break;
}
- case common::TEXT: {
- total_len += DEFAULT_RESERVED_SIZE_OF_TEXT + TEXT_LEN;
- break;
- }
default: {
// log_err("TsBlock::BuildVector unknown type %d",
// static_cast<int>(column_list_[i].type_));
@@ -68,9 +68,11 @@ uint32_t get_len(TSDataType type) {
case common::BOOLEAN: {
return sizeof(bool);
}
+ case common::DATE:
case common::INT32: {
return sizeof(int32_t);
}
+ case common::TIMESTAMP:
case common::INT64: {
return sizeof(int64_t);
}
diff --git a/cpp/src/encoding/decoder_factory.h
b/cpp/src/encoding/decoder_factory.h
index 8918c92e..12fdf766 100644
--- a/cpp/src/encoding/decoder_factory.h
+++ b/cpp/src/encoding/decoder_factory.h
@@ -58,9 +58,10 @@ class DecoderFactory {
if (encoding == common::PLAIN) {
ALLOC_AND_RETURN_DECODER(PlainDecoder);
} else if (encoding == common::GORILLA) {
- if (data_type == common::INT32) {
+ if (data_type == common::INT32 || data_type == common::DATE) {
ALLOC_AND_RETURN_DECODER(IntGorillaDecoder);
- } else if (data_type == common::INT64) {
+ } else if (data_type == common::INT64 ||
+ data_type == common::TIMESTAMP) {
ALLOC_AND_RETURN_DECODER(LongGorillaDecoder);
} else if (data_type == common::FLOAT) {
ALLOC_AND_RETURN_DECODER(FloatGorillaDecoder);
@@ -71,9 +72,10 @@ class DecoderFactory {
return nullptr;
}
} else if (encoding == common::TS_2DIFF) {
- if (data_type == common::INT32) {
+ if (data_type == common::INT32 || data_type == common::DATE) {
ALLOC_AND_RETURN_DECODER(IntTS2DIFFDecoder);
- } else if (data_type == common::INT64) {
+ } else if (data_type == common::INT64 ||
+ data_type == common::TIMESTAMP) {
ALLOC_AND_RETURN_DECODER(LongTS2DIFFDecoder);
} else if (data_type == common::FLOAT) {
ALLOC_AND_RETURN_DECODER(FloatTS2DIFFDecoder);
diff --git a/cpp/src/encoding/encode_utils.h b/cpp/src/encoding/encode_utils.h
index 45c4b047..7cbd2afc 100644
--- a/cpp/src/encoding/encode_utils.h
+++ b/cpp/src/encoding/encode_utils.h
@@ -52,29 +52,33 @@ FORCE_INLINE int32_t number_of_trailing_zeros(int32_t i) {
if (i == 0) {
return 32;
}
- int32_t y;
+ uint32_t x = static_cast<uint32_t>(i);
int32_t n = 31;
- y = i << 16;
+ uint32_t y;
+ y = x << 16;
if (y != 0) {
- n = n - 16;
- i = y;
+ n -= 16;
+ x = y;
}
- y = i << 8;
+
+ y = x << 8;
if (y != 0) {
- n = n - 8;
- i = y;
+ n -= 8;
+ x = y;
}
- y = i << 4;
+
+ y = x << 4;
if (y != 0) {
- n = n - 4;
- i = y;
+ n -= 4;
+ x = y;
}
- y = i << 2;
+
+ y = x << 2;
if (y != 0) {
- n = n - 2;
- i = y;
+ n -= 2;
+ x = y;
}
- return n - (((uint32_t)(i << 1)) >> 31);
+ return n - static_cast<int32_t>((x << 1) >> 31);
}
FORCE_INLINE int32_t number_of_leading_zeros(int64_t i) {
@@ -108,38 +112,37 @@ FORCE_INLINE int32_t number_of_leading_zeros(int64_t i) {
}
FORCE_INLINE int32_t number_of_trailing_zeros(int64_t i) {
- if (i == 0) {
- return 64;
- }
- int32_t x, y;
+ if (i == 0) return 64;
+ uint32_t x, y;
int32_t n = 63;
- y = (int32_t)i;
+ y = static_cast<uint32_t>(i);
if (y != 0) {
- n = n - 32;
+ n -= 32;
x = y;
- } else
- x = (int32_t)(((uint64_t)i) >> 32);
+ } else {
+ x = static_cast<uint32_t>(static_cast<uint64_t>(i) >> 32);
+ }
y = x << 16;
- if (y != 0) {
- n = n - 16;
+ if (y) {
+ n -= 16;
x = y;
}
y = x << 8;
- if (y != 0) {
- n = n - 8;
+ if (y) {
+ n -= 8;
x = y;
}
y = x << 4;
- if (y != 0) {
- n = n - 4;
+ if (y) {
+ n -= 4;
x = y;
}
y = x << 2;
- if (y != 0) {
- n = n - 2;
+ if (y) {
+ n -= 2;
x = y;
}
- return n - (((uint32_t)(x << 1)) >> 31);
+ return n - static_cast<int32_t>((x << 1) >> 31);
}
} // end namespace storage
diff --git a/cpp/src/encoding/encoder_factory.h
b/cpp/src/encoding/encoder_factory.h
index 0e582ae3..1f86bb6c 100644
--- a/cpp/src/encoding/encoder_factory.h
+++ b/cpp/src/encoding/encoder_factory.h
@@ -78,14 +78,17 @@ class EncoderFactory {
} else if (encoding == common::DIFF) {
return nullptr;
} else if (encoding == common::TS_2DIFF) {
- if (data_type == common::INT32) {
+ if (data_type == common::INT32 || data_type == common::DATE) {
ALLOC_AND_RETURN_ENCODER(IntTS2DIFFEncoder);
- } else if (data_type == common::INT64) {
+ } else if (data_type == common::INT64 ||
+ data_type == common::TIMESTAMP) {
ALLOC_AND_RETURN_ENCODER(LongTS2DIFFEncoder);
} else if (data_type == common::FLOAT) {
ALLOC_AND_RETURN_ENCODER(FloatTS2DIFFEncoder);
} else if (data_type == common::DOUBLE) {
ALLOC_AND_RETURN_ENCODER(DoubleTS2DIFFEncoder);
+ } else if (data_type == common::TIMESTAMP) {
+ ALLOC_AND_RETURN_ENCODER(LongTS2DIFFEncoder);
} else {
ASSERT(false);
}
@@ -96,7 +99,7 @@ class EncoderFactory {
} else if (encoding == common::REGULAR) {
return nullptr;
} else if (encoding == common::GORILLA) {
- if (data_type == common::INT32) {
+ if (data_type == common::INT32 || data_type == common::DATE) {
ALLOC_AND_RETURN_ENCODER(IntGorillaEncoder);
} else if (data_type == common::INT64) {
ALLOC_AND_RETURN_ENCODER(LongGorillaEncoder);
@@ -104,6 +107,9 @@ class EncoderFactory {
ALLOC_AND_RETURN_ENCODER(FloatGorillaEncoder);
} else if (data_type == common::DOUBLE) {
ALLOC_AND_RETURN_ENCODER(DoubleGorillaEncoder);
+ } else if (data_type == common::INT64 ||
+ data_type == common::TIMESTAMP) {
+ ALLOC_AND_RETURN_ENCODER(LongGorillaEncoder);
} else {
ASSERT(false);
}
diff --git a/cpp/src/reader/aligned_chunk_reader.cc
b/cpp/src/reader/aligned_chunk_reader.cc
index 5e1bbe43..8a96ff75 100644
--- a/cpp/src/reader/aligned_chunk_reader.cc
+++ b/cpp/src/reader/aligned_chunk_reader.cc
@@ -210,7 +210,8 @@ int AlignedChunkReader::get_next_page(TsBlock *ret_tsblock,
Filter *filter =
(oneshoot_filter != nullptr ? oneshoot_filter : time_filter_);
if (prev_time_page_not_finish() && prev_value_page_not_finish()) {
- ret = decode_time_value_buf_into_tsblock(ret_tsblock, oneshoot_filter);
+ ret = decode_time_value_buf_into_tsblock(ret_tsblock, oneshoot_filter,
+ &pa);
return ret;
}
if (!prev_time_page_not_finish() && !prev_value_page_not_finish()) {
@@ -236,7 +237,8 @@ int AlignedChunkReader::get_next_page(TsBlock *ret_tsblock,
}
}
if (IS_SUCC(ret)) {
- ret = decode_time_value_buf_into_tsblock(ret_tsblock, oneshoot_filter);
+ ret = decode_time_value_buf_into_tsblock(ret_tsblock, oneshoot_filter,
+ &pa);
}
return ret;
}
@@ -484,10 +486,10 @@ int AlignedChunkReader::decode_cur_value_page_data() {
}
int AlignedChunkReader::decode_time_value_buf_into_tsblock(
- TsBlock *&ret_tsblock, Filter *filter) {
+ TsBlock *&ret_tsblock, Filter *filter, common::PageArena *pa) {
int ret = common::E_OK;
ret = decode_tv_buf_into_tsblock_by_datatype(time_in_, value_in_,
- ret_tsblock, filter);
+ ret_tsblock, filter, pa);
// if we return during @decode_tv_buf_into_tsblock, we should keep
// @uncompressed_buf_ valid until all TV pairs are decoded.
if (ret != E_OVERFLOW) {
@@ -562,40 +564,47 @@ int AlignedChunkReader::i32_DECODE_TYPED_TV_INTO_TSBLOCK(
Filter *filter) {
int ret = E_OK;
uint32_t mask = 1 << 7;
- do {
- int64_t time = 0;
- int32_t value;
- while ((time_decoder_->has_remaining() &&
- value_decoder_->has_remaining()) ||
- (time_in.has_remaining() && value_in.has_remaining())) {
- if (((value_page_col_notnull_bitmap_[cur_value_index / 8] & 0xFF) &
- (mask >> (cur_value_index % 8))) == 0) {
- RET_FAIL(time_decoder_->read_int64(time, time_in));
- continue;
+ int64_t time = 0;
+ int32_t value;
+ while ((time_decoder_->has_remaining() || time_in.has_remaining()) &&
+ (value_decoder_->has_remaining() || value_in.has_remaining())) {
+ cur_value_index++;
+ if (((value_page_col_notnull_bitmap_[cur_value_index / 8] & 0xFF) &
+ (mask >> (cur_value_index % 8))) == 0) {
+ ret = time_decoder_->read_int64(time, time_in);
+ if (ret != E_OK) {
+ break;
}
if (UNLIKELY(!row_appender.add_row())) {
ret = E_OVERFLOW;
break;
- } else if (RET_FAIL(time_decoder_->read_int64(time, time_in))) {
- }
- if (RET_FAIL(value_decoder_->read_int32(value, value_in))) {
- } else if (filter != nullptr && !filter->satisfy(time, value)) {
- row_appender.backoff_add_row();
- continue;
- } else {
- /*std::cout << "decoder: time=" << time << ", value=" << value
- * << std::endl;*/
- row_appender.append(0, (char *)&time, sizeof(time));
- row_appender.append(1, (char *)&value, sizeof(value));
}
+ row_appender.append(0, (char *)&time, sizeof(time));
+ row_appender.append_null(1);
+ continue;
}
- } while (false);
+ if (UNLIKELY(!row_appender.add_row())) {
+ ret = E_OVERFLOW;
+ cur_value_index--;
+ break;
+ } else if (RET_FAIL(time_decoder_->read_int64(time, time_in))) {
+ } else if (RET_FAIL(value_decoder_->read_int32(value, value_in))) {
+ } else if (filter != nullptr && !filter->satisfy(time, value)) {
+ row_appender.backoff_add_row();
+ continue;
+ } else {
+ /*std::cout << "decoder: time=" << time << ", value=" << value
+ * << std::endl;*/
+ row_appender.append(0, (char *)&time, sizeof(time));
+ row_appender.append(1, (char *)&value, sizeof(value));
+ }
+ }
return ret;
}
int AlignedChunkReader::decode_tv_buf_into_tsblock_by_datatype(
ByteStream &time_in, ByteStream &value_in, TsBlock *ret_tsblock,
- Filter *filter) {
+ Filter *filter, common::PageArena *pa) {
int ret = E_OK;
RowAppender row_appender(ret_tsblock);
switch (value_chunk_header_.data_type_) {
@@ -603,10 +612,14 @@ int
AlignedChunkReader::decode_tv_buf_into_tsblock_by_datatype(
DECODE_TYPED_TV_INTO_TSBLOCK(bool, boolean, time_in_, value_in_,
row_appender);
break;
+ case common::DATE:
case common::INT32:
- DECODE_TYPED_TV_INTO_TSBLOCK(int32_t, int32, time_in_, value_in_,
- row_appender);
+ // DECODE_TYPED_TV_INTO_TSBLOCK(int32_t, int32, time_in_,
value_in_,
+ // row_appender);
+ ret = i32_DECODE_TYPED_TV_INTO_TSBLOCK(time_in_, value_in_,
+ row_appender, filter);
break;
+ case common::TIMESTAMP:
case common::INT64:
DECODE_TYPED_TV_INTO_TSBLOCK(int64_t, int64, time_in_, value_in_,
row_appender);
@@ -619,6 +632,12 @@ int
AlignedChunkReader::decode_tv_buf_into_tsblock_by_datatype(
DECODE_TYPED_TV_INTO_TSBLOCK(double, double, time_in_, value_in_,
row_appender);
break;
+ case common::STRING:
+ case common::BLOB:
+ case common::TEXT:
+ ret = STRING_DECODE_TYPED_TV_INTO_TSBLOCK(
+ time_in, value_in, row_appender, *pa, filter);
+ break;
default:
ret = E_NOT_SUPPORT;
ASSERT(false);
@@ -629,4 +648,28 @@ int
AlignedChunkReader::decode_tv_buf_into_tsblock_by_datatype(
return ret;
}
+int AlignedChunkReader::STRING_DECODE_TYPED_TV_INTO_TSBLOCK(
+ ByteStream &time_in, ByteStream &value_in, RowAppender &row_appender,
+ PageArena &pa, Filter *filter) {
+ int ret = E_OK;
+ int64_t time = 0;
+ common::String value;
+ while (time_decoder_->has_remaining() || time_in.has_remaining()) {
+ ASSERT(value_decoder_->has_remaining() || value_in.has_remaining());
+ if (UNLIKELY(!row_appender.add_row())) {
+ ret = E_OVERFLOW;
+ break;
+ } else if (RET_FAIL(time_decoder_->read_int64(time, time_in))) {
+ } else if (RET_FAIL(value_decoder_->read_String(value, pa, value_in)))
{
+ } else if (filter != nullptr && !filter->satisfy(time, value)) {
+ row_appender.backoff_add_row();
+ continue;
+ } else {
+ row_appender.append(0, (char *)&time, sizeof(time));
+ row_appender.append(1, value.buf_, value.len_);
+ }
+ }
+ return ret;
+}
+
} // end namespace storage
\ No newline at end of file
diff --git a/cpp/src/reader/aligned_chunk_reader.h
b/cpp/src/reader/aligned_chunk_reader.h
index 58898f7d..998409cd 100644
--- a/cpp/src/reader/aligned_chunk_reader.h
+++ b/cpp/src/reader/aligned_chunk_reader.h
@@ -102,7 +102,8 @@ class AlignedChunkReader : public IChunkReader {
int decode_cur_time_page_data();
int decode_cur_value_page_data();
int decode_time_value_buf_into_tsblock(common::TsBlock *&ret_tsblock,
- Filter *filter);
+ Filter *filter,
+ common::PageArena *pa);
bool prev_time_page_not_finish() const {
return (time_decoder_ && time_decoder_->has_remaining()) ||
time_in_.has_remaining();
@@ -116,11 +117,17 @@ class AlignedChunkReader : public IChunkReader {
int decode_tv_buf_into_tsblock_by_datatype(common::ByteStream &time_in,
common::ByteStream &value_in,
common::TsBlock *ret_tsblock,
- Filter *filter);
+ Filter *filter,
+ common::PageArena *pa);
int i32_DECODE_TYPED_TV_INTO_TSBLOCK(common::ByteStream &time_in,
common::ByteStream &value_in,
common::RowAppender &row_appender,
Filter *filter);
+ int STRING_DECODE_TYPED_TV_INTO_TSBLOCK(common::ByteStream &time_in,
+ common::ByteStream &value_in,
+ common::RowAppender &row_appender,
+ common::PageArena &pa,
+ Filter *filter);
private:
ReadFile *read_file_;
diff --git a/cpp/src/reader/bloom_filter.cc b/cpp/src/reader/bloom_filter.cc
index 174791dd..591bb6c8 100644
--- a/cpp/src/reader/bloom_filter.cc
+++ b/cpp/src/reader/bloom_filter.cc
@@ -179,8 +179,10 @@ String BloomFilter::get_entry_string(const String
&device_name,
}
memcpy(path_buf, device_name.buf_, device_name.len_);
*(path_buf + device_name.len_) = '.';
- memcpy(path_buf + device_name.len_ + 1, measurement_name.buf_,
- measurement_name.len_);
+ if (measurement_name.buf_) {
+ memcpy(path_buf + device_name.len_ + 1, measurement_name.buf_,
+ measurement_name.len_);
+ }
*(path_buf + device_name.len_ + measurement_name.len_ + 1) = '\0';
ret_str.buf_ = path_buf;
ret_str.len_ = len;
diff --git a/cpp/src/reader/chunk_reader.cc b/cpp/src/reader/chunk_reader.cc
index 6681e0d5..fad6cf39 100644
--- a/cpp/src/reader/chunk_reader.cc
+++ b/cpp/src/reader/chunk_reader.cc
@@ -455,12 +455,14 @@ int
ChunkReader::decode_tv_buf_into_tsblock_by_datatype(ByteStream &time_in,
DECODE_TYPED_TV_INTO_TSBLOCK(bool, boolean, time_in_, value_in_,
row_appender);
break;
+ case common::DATE:
case common::INT32:
// DECODE_TYPED_TV_INTO_TSBLOCK(int32_t, int32, time_in_,
value_in_,
// row_appender);
ret = i32_DECODE_TYPED_TV_INTO_TSBLOCK(time_in_, value_in_,
row_appender, filter);
break;
+ case TIMESTAMP:
case common::INT64:
DECODE_TYPED_TV_INTO_TSBLOCK(int64_t, int64, time_in_, value_in_,
row_appender);
@@ -473,6 +475,8 @@ int
ChunkReader::decode_tv_buf_into_tsblock_by_datatype(ByteStream &time_in,
DECODE_TYPED_TV_INTO_TSBLOCK(double, double, time_in_, value_in_,
row_appender);
break;
+ case common::TEXT:
+ case common::BLOB:
case common::STRING:
ret = STRING_DECODE_TYPED_TV_INTO_TSBLOCK(
time_in, value_in, row_appender, *pa, filter);
diff --git a/cpp/src/reader/result_set.h b/cpp/src/reader/result_set.h
index fa1ecd0c..b0d45b01 100644
--- a/cpp/src/reader/result_set.h
+++ b/cpp/src/reader/result_set.h
@@ -215,6 +215,22 @@ inline common::String* ResultSet::get_value(uint32_t
column_index) {
return row_record->get_field(column_index)->get_string_value();
}
+template <>
+inline std::tm ResultSet::get_value(const std::string& full_name) {
+ RowRecord* row_record = get_row_record();
+ ASSERT(index_lookup_.count(full_name));
+ uint32_t index = index_lookup_[full_name];
+ ASSERT(index >= 0 && index < row_record->get_col_num());
+ return row_record->get_field(index)->get_date_value();
+}
+template <>
+inline std::tm ResultSet::get_value(uint32_t column_index) {
+ column_index--;
+ RowRecord* row_record = get_row_record();
+ ASSERT(column_index >= 0 && column_index < row_record->get_col_num());
+ return row_record->get_field(column_index)->get_date_value();
+}
+
} // namespace storage
#endif // READER_QUERY_DATA_SET_H
diff --git a/cpp/src/writer/chunk_writer.h b/cpp/src/writer/chunk_writer.h
index 44eb06f5..d12cf22e 100644
--- a/cpp/src/writer/chunk_writer.h
+++ b/cpp/src/writer/chunk_writer.h
@@ -28,12 +28,9 @@
namespace storage {
-#define CW_DO_WRITE_FOR_TYPE(TSDATATYPE) \
+#define CW_DO_WRITE_FOR_TYPE() \
{ \
int ret = common::E_OK; \
- if (UNLIKELY(data_type_ != TSDATATYPE)) { \
- return common::E_TYPE_NOT_MATCH; \
- } \
if (RET_FAIL(page_writer_.write(timestamp, value))) { \
return ret; \
} \
@@ -66,22 +63,44 @@ class ChunkWriter {
void destroy();
FORCE_INLINE int write(int64_t timestamp, bool value) {
- CW_DO_WRITE_FOR_TYPE(common::BOOLEAN);
+ if (UNLIKELY(data_type_ != common::BOOLEAN)) {
+ return common::E_TYPE_NOT_MATCH;
+ }
+ CW_DO_WRITE_FOR_TYPE();
}
FORCE_INLINE int write(int64_t timestamp, int32_t value) {
- CW_DO_WRITE_FOR_TYPE(common::INT32);
+ if (UNLIKELY(data_type_ != common::INT32 &&
+ data_type_ != common::DATE)) {
+ return common::E_TYPE_NOT_MATCH;
+ }
+ CW_DO_WRITE_FOR_TYPE();
}
FORCE_INLINE int write(int64_t timestamp, int64_t value) {
- CW_DO_WRITE_FOR_TYPE(common::INT64);
+ if (UNLIKELY(data_type_ != common::INT64 &&
+ data_type_ != common::TIMESTAMP)) {
+ return common::E_TYPE_NOT_MATCH;
+ }
+ CW_DO_WRITE_FOR_TYPE();
}
FORCE_INLINE int write(int64_t timestamp, float value) {
- CW_DO_WRITE_FOR_TYPE(common::FLOAT);
+ if (UNLIKELY(data_type_ != common::FLOAT)) {
+ return common::E_TYPE_NOT_MATCH;
+ }
+ CW_DO_WRITE_FOR_TYPE();
}
FORCE_INLINE int write(int64_t timestamp, double value) {
- CW_DO_WRITE_FOR_TYPE(common::DOUBLE);
+ if (UNLIKELY(data_type_ != common::DOUBLE)) {
+ return common::E_TYPE_NOT_MATCH;
+ }
+ CW_DO_WRITE_FOR_TYPE();
}
FORCE_INLINE int write(int64_t timestamp, common::String value) {
- CW_DO_WRITE_FOR_TYPE(common::STRING);
+ if (UNLIKELY(data_type_ != common::STRING &&
+ data_type_ != common::TEXT &&
+ data_type_ != common::BLOB)) {
+ return common::E_TYPE_NOT_MATCH;
+ }
+ CW_DO_WRITE_FOR_TYPE();
}
int end_encode_chunk();
diff --git a/cpp/src/writer/page_writer.h b/cpp/src/writer/page_writer.h
index e60cc120..47ed3296 100644
--- a/cpp/src/writer/page_writer.h
+++ b/cpp/src/writer/page_writer.h
@@ -79,10 +79,7 @@ struct PageData {
int ret = common::E_OK;
\
/* std::cout << "page_writer writer: time=" << timestamp << ", value="
\
* << value << std::endl; */
\
- if (UNLIKELY(data_type_ != TSDATATYPE)) {
\
- ret = common::E_TYPE_NOT_MATCH;
\
- } else if (RET_FAIL(
\
- time_encoder_->encode(timestamp, time_out_stream_))) {
\
+ if (RET_FAIL(time_encoder_->encode(timestamp, time_out_stream_))) {
\
} else if (RET_FAIL(
\
value_encoder_->encode(value, value_out_stream_))) {
\
} else {
\
@@ -113,22 +110,44 @@ class PageWriter {
void destroy();
FORCE_INLINE int write(int64_t timestamp, bool value) {
- PW_DO_WRITE_FOR_TYPE(common::BOOLEAN);
+ if (UNLIKELY(data_type_ != common::BOOLEAN)) {
+ return common::E_TYPE_NOT_MATCH;
+ }
+ PW_DO_WRITE_FOR_TYPE();
}
FORCE_INLINE int write(int64_t timestamp, int32_t value) {
- PW_DO_WRITE_FOR_TYPE(common::INT32);
+ if (UNLIKELY(data_type_ != common::INT32 &&
+ data_type_ != common::DATE)) {
+ return common::E_TYPE_NOT_MATCH;
+ }
+ PW_DO_WRITE_FOR_TYPE();
}
FORCE_INLINE int write(int64_t timestamp, int64_t value) {
- PW_DO_WRITE_FOR_TYPE(common::INT64);
+ if (UNLIKELY(data_type_ != common::INT64 &&
+ data_type_ != common::TIMESTAMP)) {
+ return common::E_TYPE_NOT_MATCH;
+ }
+ PW_DO_WRITE_FOR_TYPE();
}
FORCE_INLINE int write(int64_t timestamp, float value) {
- PW_DO_WRITE_FOR_TYPE(common::FLOAT);
+ if (UNLIKELY(data_type_ != common::FLOAT)) {
+ return common::E_TYPE_NOT_MATCH;
+ }
+ PW_DO_WRITE_FOR_TYPE();
}
FORCE_INLINE int write(int64_t timestamp, double value) {
- PW_DO_WRITE_FOR_TYPE(common::DOUBLE);
+ if (UNLIKELY(data_type_ != common::DOUBLE)) {
+ return common::E_TYPE_NOT_MATCH;
+ }
+ PW_DO_WRITE_FOR_TYPE();
}
FORCE_INLINE int write(int64_t timestamp, common::String value) {
- PW_DO_WRITE_FOR_TYPE(common::STRING);
+ if (UNLIKELY(data_type_ != common::STRING &&
+ data_type_ != common::TEXT &&
+ data_type_ != common::BLOB)) {
+ return common::E_TYPE_NOT_MATCH;
+ }
+ PW_DO_WRITE_FOR_TYPE();
}
FORCE_INLINE uint32_t get_point_numer() const { return statistic_->count_;
}
diff --git a/cpp/src/writer/tsfile_writer.cc b/cpp/src/writer/tsfile_writer.cc
index 73e4543e..54c1be02 100644
--- a/cpp/src/writer/tsfile_writer.cc
+++ b/cpp/src/writer/tsfile_writer.cc
@@ -329,9 +329,11 @@ int TsFileWriter::do_check_and_prepare_tablet(Tablet
&tablet) {
}
template <typename MeasurementNamesGetter>
-int TsFileWriter::do_check_schema(std::shared_ptr<IDeviceID> device_id,
- MeasurementNamesGetter &measurement_names,
- SimpleVector<ChunkWriter *> &chunk_writers) {
+int TsFileWriter::do_check_schema(
+ std::shared_ptr<IDeviceID> device_id,
+ MeasurementNamesGetter &measurement_names,
+ SimpleVector<ChunkWriter *> &chunk_writers,
+ SimpleVector<common::TSDataType> &data_types) {
int ret = E_OK;
DeviceSchemasMapIter dev_it = schemas_.find(device_id);
MeasurementSchemaGroup *device_schema = nullptr;
@@ -346,6 +348,7 @@ int
TsFileWriter::do_check_schema(std::shared_ptr<IDeviceID> device_id,
auto ms_iter = msm.find(measurement_names.next());
if (UNLIKELY(ms_iter == msm.end())) {
chunk_writers.push_back(NULL);
+ data_types.push_back(common::NULL_TYPE);
} else {
// In Java we will check data_type. But in C++, no check here.
// Because checks are performed at the chunk layer and page layer
@@ -371,6 +374,7 @@ int
TsFileWriter::do_check_schema(std::shared_ptr<IDeviceID> device_id,
} else {
chunk_writers.push_back(ms->chunk_writer_);
}
+ data_types.push_back(ms->data_type_);
}
}
return ret;
@@ -381,7 +385,8 @@ int TsFileWriter::do_check_schema_aligned(
std::shared_ptr<IDeviceID> device_id,
MeasurementNamesGetter &measurement_names,
storage::TimeChunkWriter *&time_chunk_writer,
- common::SimpleVector<storage::ValueChunkWriter *> &value_chunk_writers) {
+ common::SimpleVector<storage::ValueChunkWriter *> &value_chunk_writers,
+ SimpleVector<common::TSDataType> &data_types) {
int ret = E_OK;
auto dev_it = schemas_.find(device_id);
MeasurementSchemaGroup *device_schema = NULL;
@@ -402,6 +407,7 @@ int TsFileWriter::do_check_schema_aligned(
auto ms_iter = msm.find(measurement_names.next());
if (UNLIKELY(ms_iter == msm.end())) {
value_chunk_writers.push_back(NULL);
+ data_types.push_back(common::NULL_TYPE);
} else {
// Here we may check data_type against ms_iter. But in Java
// libtsfile, no check here.
@@ -428,6 +434,7 @@ int TsFileWriter::do_check_schema_aligned(
} else {
value_chunk_writers.push_back(ms->value_chunk_writer_);
}
+ data_types.push_back(ms->data_type_);
}
}
return ret;
@@ -580,10 +587,11 @@ int TsFileWriter::write_record(const TsRecord &record) {
int ret = E_OK;
// std::vector<ChunkWriter*> chunk_writers;
SimpleVector<ChunkWriter *> chunk_writers;
+ SimpleVector<common::TSDataType> data_types;
MeasurementNamesFromRecord mnames_getter(record);
if (RET_FAIL(do_check_schema(
std::make_shared<StringArrayDeviceID>(record.device_id_),
- mnames_getter, chunk_writers))) {
+ mnames_getter, chunk_writers, data_types))) {
return ret;
}
@@ -594,7 +602,8 @@ int TsFileWriter::write_record(const TsRecord &record) {
continue;
}
// ignore point writer failure
- write_point(chunk_writer, record.timestamp_, record.points_[c]);
+ write_point(chunk_writer, record.timestamp_, data_types[c],
+ record.points_[c]);
}
record_count_since_last_flush_++;
@@ -605,11 +614,13 @@ int TsFileWriter::write_record(const TsRecord &record) {
int TsFileWriter::write_record_aligned(const TsRecord &record) {
int ret = E_OK;
SimpleVector<ValueChunkWriter *> value_chunk_writers;
+ SimpleVector<common::TSDataType> data_types;
TimeChunkWriter *time_chunk_writer;
MeasurementNamesFromRecord mnames_getter(record);
if (RET_FAIL(do_check_schema_aligned(
std::make_shared<StringArrayDeviceID>(record.device_id_),
- mnames_getter, time_chunk_writer, value_chunk_writers))) {
+ mnames_getter, time_chunk_writer, value_chunk_writers,
+ data_types))) {
return ret;
}
if (value_chunk_writers.size() != record.points_.size()) {
@@ -622,29 +633,31 @@ int TsFileWriter::write_record_aligned(const TsRecord
&record) {
continue;
}
write_point_aligned(value_chunk_writer, record.timestamp_,
- record.points_[c]);
+ data_types[c], record.points_[c]);
}
return ret;
}
int TsFileWriter::write_point(ChunkWriter *chunk_writer, int64_t timestamp,
+ common::TSDataType data_type,
const DataPoint &point) {
- switch (point.data_type_) {
+ switch (data_type) {
case common::BOOLEAN:
return chunk_writer->write(timestamp, point.u_.bool_val_);
+ case common::DATE:
case common::INT32:
return chunk_writer->write(timestamp, point.u_.i32_val_);
+ case common::TIMESTAMP:
case common::INT64:
return chunk_writer->write(timestamp, point.u_.i64_val_);
case common::FLOAT:
return chunk_writer->write(timestamp, point.u_.float_val_);
case common::DOUBLE:
return chunk_writer->write(timestamp, point.u_.double_val_);
+ case common::BLOB:
+ case common::TEXT:
case common::STRING:
return chunk_writer->write(timestamp, *point.u_.str_val_);
- case common::TEXT:
- ASSERT(false);
- return E_OK;
default:
return E_INVALID_DATA_POINT;
}
@@ -652,15 +665,18 @@ int TsFileWriter::write_point(ChunkWriter *chunk_writer,
int64_t timestamp,
int TsFileWriter::write_point_aligned(ValueChunkWriter *value_chunk_writer,
int64_t timestamp,
+ common::TSDataType data_type,
const DataPoint &point) {
bool isnull = point.isnull;
- switch (point.data_type_) {
+ switch (data_type) {
case common::BOOLEAN:
return value_chunk_writer->write(timestamp, point.u_.bool_val_,
isnull);
case common::INT32:
+ case common::DATE:
return value_chunk_writer->write(timestamp, point.u_.i32_val_,
isnull);
+ case common::TIMESTAMP:
case common::INT64:
return value_chunk_writer->write(timestamp, point.u_.i64_val_,
isnull);
@@ -670,9 +686,11 @@ int TsFileWriter::write_point_aligned(ValueChunkWriter
*value_chunk_writer,
case common::DOUBLE:
return value_chunk_writer->write(timestamp, point.u_.double_val_,
isnull);
+ case common::BLOB:
case common::TEXT:
- ASSERT(false);
- return E_OK;
+ case common::STRING:
+ return value_chunk_writer->write(timestamp, point.u_.str_val_,
+ isnull);
default:
return E_INVALID_DATA_POINT;
}
@@ -682,10 +700,12 @@ int TsFileWriter::write_tablet_aligned(const Tablet
&tablet) {
int ret = E_OK;
SimpleVector<ValueChunkWriter *> value_chunk_writers;
TimeChunkWriter *time_chunk_writer = nullptr;
+ SimpleVector<common::TSDataType> data_types;
MeasurementNamesFromTablet mnames_getter(tablet);
if (RET_FAIL(do_check_schema_aligned(
std::make_shared<StringArrayDeviceID>(tablet.insert_target_name_),
- mnames_getter, time_chunk_writer, value_chunk_writers))) {
+ mnames_getter, time_chunk_writer, value_chunk_writers,
+ data_types))) {
return ret;
}
time_write_column(time_chunk_writer, tablet);
@@ -706,10 +726,11 @@ int TsFileWriter::write_tablet_aligned(const Tablet
&tablet) {
int TsFileWriter::write_tablet(const Tablet &tablet) {
int ret = E_OK;
SimpleVector<ChunkWriter *> chunk_writers;
+ SimpleVector<common::TSDataType> data_types;
MeasurementNamesFromTablet mnames_getter(tablet);
if (RET_FAIL(do_check_schema(
std::make_shared<StringArrayDeviceID>(tablet.insert_target_name_),
- mnames_getter, chunk_writers))) {
+ mnames_getter, chunk_writers, data_types))) {
return ret;
}
ASSERT(chunk_writers.size() == tablet.get_column_count());
@@ -780,8 +801,9 @@ int TsFileWriter::write_table(Tablet &tablet) {
} else {
MeasurementNamesFromTablet mnames_getter(tablet);
SimpleVector<ChunkWriter *> chunk_writers;
- if (RET_FAIL(
- do_check_schema(device_id, mnames_getter, chunk_writers)))
{
+ SimpleVector<common::TSDataType> data_types;
+ if (RET_FAIL(do_check_schema(device_id, mnames_getter,
+ chunk_writers, data_types))) {
return ret;
}
ASSERT(chunk_writers.size() == tablet.get_column_count());
@@ -884,29 +906,43 @@ int TsFileWriter::value_write_column(ValueChunkWriter
*value_chunk_writer,
int64_t *timestamps = tablet.timestamps_;
Tablet::ValueMatrixEntry col_values = tablet.value_matrix_[col_idx];
BitMap &col_notnull_bitmap = tablet.bitmaps_[col_idx];
-
- if (data_type == common::BOOLEAN) {
- ret = write_typed_column(value_chunk_writer, timestamps,
- (bool *)col_values.bool_data,
- col_notnull_bitmap, start_idx, end_idx);
- } else if (data_type == common::INT32) {
- ret = write_typed_column(value_chunk_writer, timestamps,
- (int32_t *)col_values.int32_data,
- col_notnull_bitmap, start_idx, end_idx);
- } else if (data_type == common::INT64) {
- ret = write_typed_column(value_chunk_writer, timestamps,
- (int64_t *)col_values.int64_data,
- col_notnull_bitmap, start_idx, end_idx);
- } else if (data_type == common::FLOAT) {
- ret = write_typed_column(value_chunk_writer, timestamps,
- (float *)col_values.float_data,
- col_notnull_bitmap, start_idx, end_idx);
- } else if (data_type == common::DOUBLE) {
- ret = write_typed_column(value_chunk_writer, timestamps,
- (double *)col_values.double_data,
- col_notnull_bitmap, start_idx, end_idx);
- } else {
- return E_NOT_SUPPORT;
+ switch (data_type) {
+ case common::BOOLEAN:
+ ret = write_typed_column(value_chunk_writer, timestamps,
+ (bool *)col_values.bool_data,
+ col_notnull_bitmap, start_idx, end_idx);
+ break;
+ case common::DATE:
+ case common::INT32:
+ ret = write_typed_column(value_chunk_writer, timestamps,
+ (int32_t *)col_values.int32_data,
+ col_notnull_bitmap, start_idx, end_idx);
+ break;
+ case common::TIMESTAMP:
+ case common::INT64:
+ ret = write_typed_column(value_chunk_writer, timestamps,
+ (int64_t *)col_values.int64_data,
+ col_notnull_bitmap, start_idx, end_idx);
+ break;
+ case common::FLOAT:
+ ret = write_typed_column(value_chunk_writer, timestamps,
+ (float *)col_values.float_data,
+ col_notnull_bitmap, start_idx, end_idx);
+ break;
+ case common::DOUBLE:
+ ret = write_typed_column(value_chunk_writer, timestamps,
+ (double *)col_values.double_data,
+ col_notnull_bitmap, start_idx, end_idx);
+ break;
+ case common::STRING:
+ case common::TEXT:
+ case common::BLOB:
+ ret = write_typed_column(value_chunk_writer, timestamps,
+ (common::String *)col_values.string_data,
+ col_notnull_bitmap, start_idx, end_idx);
+ break;
+ default:
+ ret = E_NOT_SUPPORT;
}
return ret;
}
@@ -1022,6 +1058,14 @@ int TsFileWriter::write_typed_column(ValueChunkWriter
*value_chunk_writer,
DO_VALUE_WRITE_TYPED_COLUMN();
}
+int TsFileWriter::write_typed_column(ValueChunkWriter *value_chunk_writer,
+ int64_t *timestamps,
+ common::String *col_values,
+ common::BitMap &col_notnull_bitmap,
+ uint32_t start_idx, uint32_t end_idx) {
+ DO_VALUE_WRITE_TYPED_COLUMN();
+}
+
// TODO make sure ret is meaningful to SDK user
int TsFileWriter::flush() {
int ret = E_OK;
diff --git a/cpp/src/writer/tsfile_writer.h b/cpp/src/writer/tsfile_writer.h
index bb264c4c..bad92381 100644
--- a/cpp/src/writer/tsfile_writer.h
+++ b/cpp/src/writer/tsfile_writer.h
@@ -104,11 +104,12 @@ class TsFileWriter {
private:
int write_point(storage::ChunkWriter *chunk_writer, int64_t timestamp,
- const DataPoint &point);
+ common::TSDataType data_type, const DataPoint &point);
bool check_chunk_group_empty(MeasurementSchemaGroup *chunk_group,
bool is_aligned);
int write_point_aligned(ValueChunkWriter *value_chunk_writer,
- int64_t timestamp, const DataPoint &point);
+ int64_t timestamp, common::TSDataType data_type,
+ const DataPoint &point);
int flush_chunk_group(MeasurementSchemaGroup *chunk_group, bool
is_aligned);
int write_typed_column(storage::ChunkWriter *chunk_writer,
@@ -140,14 +141,16 @@ class TsFileWriter {
int do_check_schema(
std::shared_ptr<IDeviceID> device_id,
MeasurementNamesGetter &measurement_names,
- common::SimpleVector<storage::ChunkWriter *> &chunk_writers);
+ common::SimpleVector<storage::ChunkWriter *> &chunk_writers,
+ common::SimpleVector<common::TSDataType> &data_types);
template <typename MeasurementNamesGetter>
int do_check_schema_aligned(
std::shared_ptr<IDeviceID> device_id,
MeasurementNamesGetter &measurement_names,
storage::TimeChunkWriter *&time_chunk_writer,
- common::SimpleVector<storage::ValueChunkWriter *>
&value_chunk_writers);
+ common::SimpleVector<storage::ValueChunkWriter *> &value_chunk_writers,
+ common::SimpleVector<common::TSDataType> &data_types);
int do_check_schema_table(
std::shared_ptr<IDeviceID> device_id, Tablet &tablet,
storage::TimeChunkWriter *&time_chunk_writer,
diff --git a/cpp/src/writer/value_chunk_writer.h
b/cpp/src/writer/value_chunk_writer.h
index eef0a56a..3aba7bbc 100644
--- a/cpp/src/writer/value_chunk_writer.h
+++ b/cpp/src/writer/value_chunk_writer.h
@@ -28,12 +28,9 @@
namespace storage {
-#define VCW_DO_WRITE_FOR_TYPE(TSDATATYPE, ISNULL) \
+#define VCW_DO_WRITE_FOR_TYPE(ISNULL) \
{ \
int ret = common::E_OK; \
- if (UNLIKELY(data_type_ != TSDATATYPE)) { \
- return common::E_TYPE_NOT_MATCH; \
- } \
if (RET_FAIL(value_page_writer_.write(timestamp, value, ISNULL))) { \
return ret; \
} \
@@ -66,19 +63,50 @@ class ValueChunkWriter {
void destroy();
FORCE_INLINE int write(int64_t timestamp, bool value, bool isnull) {
- VCW_DO_WRITE_FOR_TYPE(common::BOOLEAN, isnull);
+ if (UNLIKELY(data_type_ != common::BOOLEAN)) {
+ return common::E_TYPE_NOT_MATCH;
+ }
+ VCW_DO_WRITE_FOR_TYPE(isnull);
}
+
FORCE_INLINE int write(int64_t timestamp, int32_t value, bool isnull) {
- VCW_DO_WRITE_FOR_TYPE(common::INT32, isnull);
+ if (UNLIKELY(data_type_ != common::INT32 &&
+ data_type_ != common::DATE)) {
+ return common::E_TYPE_NOT_MATCH;
+ }
+ VCW_DO_WRITE_FOR_TYPE(isnull);
}
+
FORCE_INLINE int write(int64_t timestamp, int64_t value, bool isnull) {
- VCW_DO_WRITE_FOR_TYPE(common::INT64, isnull);
+ if (UNLIKELY(data_type_ != common::INT64 &&
+ data_type_ != common::TIMESTAMP)) {
+ return common::E_TYPE_NOT_MATCH;
+ }
+ VCW_DO_WRITE_FOR_TYPE(isnull);
}
+
FORCE_INLINE int write(int64_t timestamp, float value, bool isnull) {
- VCW_DO_WRITE_FOR_TYPE(common::FLOAT, isnull);
+ if (UNLIKELY(data_type_ != common::FLOAT)) {
+ return common::E_TYPE_NOT_MATCH;
+ }
+ VCW_DO_WRITE_FOR_TYPE(isnull);
}
+
FORCE_INLINE int write(int64_t timestamp, double value, bool isnull) {
- VCW_DO_WRITE_FOR_TYPE(common::DOUBLE, isnull);
+ if (UNLIKELY(data_type_ != common::DOUBLE)) {
+ return common::E_TYPE_NOT_MATCH;
+ }
+ VCW_DO_WRITE_FOR_TYPE(isnull);
+ }
+
+ FORCE_INLINE int write(int64_t timestamp, common::String value,
+ bool isnull) {
+ if (UNLIKELY(data_type_ != common::STRING &&
+ data_type_ != common::TEXT &&
+ data_type_ != common::BLOB)) {
+ return common::E_TYPE_NOT_MATCH;
+ }
+ VCW_DO_WRITE_FOR_TYPE(isnull);
}
int end_encode_chunk();
diff --git a/cpp/src/writer/value_page_writer.h
b/cpp/src/writer/value_page_writer.h
index 9cf44aec..4cf2ffa9 100644
--- a/cpp/src/writer/value_page_writer.h
+++ b/cpp/src/writer/value_page_writer.h
@@ -63,13 +63,9 @@ struct ValuePageData {
}
};
-#define VPW_DO_WRITE_FOR_TYPE(TSDATATYPE, ISNULL) \
+#define VPW_DO_WRITE_FOR_TYPE(ISNULL) \
{ \
int ret = common::E_OK; \
- if (UNLIKELY(data_type_ != TSDATATYPE)) { \
- ret = common::E_TYPE_NOT_MATCH; \
- return ret; \
- } \
if ((size_ / 8) + 1 > col_notnull_bitmap_.size()) { \
col_notnull_bitmap_.push_back(0); \
} \
@@ -109,19 +105,50 @@ class ValuePageWriter {
void destroy();
FORCE_INLINE int write(int64_t timestamp, bool value, bool isnull) {
- VPW_DO_WRITE_FOR_TYPE(common::BOOLEAN, isnull);
+ if (UNLIKELY(data_type_ != common::BOOLEAN)) {
+ return common::E_TYPE_NOT_MATCH;
+ }
+ VPW_DO_WRITE_FOR_TYPE(isnull);
}
+
FORCE_INLINE int write(int64_t timestamp, int32_t value, bool isnull) {
- VPW_DO_WRITE_FOR_TYPE(common::INT32, isnull);
+ if (UNLIKELY(data_type_ != common::INT32 &&
+ data_type_ != common::DATE)) {
+ return common::E_TYPE_NOT_MATCH;
+ }
+ VPW_DO_WRITE_FOR_TYPE(isnull);
}
+
FORCE_INLINE int write(int64_t timestamp, int64_t value, bool isnull) {
- VPW_DO_WRITE_FOR_TYPE(common::INT64, isnull);
+ if (UNLIKELY(data_type_ != common::INT64 &&
+ data_type_ != common::TIMESTAMP)) {
+ return common::E_TYPE_NOT_MATCH;
+ }
+ VPW_DO_WRITE_FOR_TYPE(isnull);
}
+
FORCE_INLINE int write(int64_t timestamp, float value, bool isnull) {
- VPW_DO_WRITE_FOR_TYPE(common::FLOAT, isnull);
+ if (UNLIKELY(data_type_ != common::FLOAT)) {
+ return common::E_TYPE_NOT_MATCH;
+ }
+ VPW_DO_WRITE_FOR_TYPE(isnull);
}
+
FORCE_INLINE int write(int64_t timestamp, double value, bool isnull) {
- VPW_DO_WRITE_FOR_TYPE(common::DOUBLE, isnull);
+ if (UNLIKELY(data_type_ != common::DOUBLE)) {
+ return common::E_TYPE_NOT_MATCH;
+ }
+ VPW_DO_WRITE_FOR_TYPE(isnull);
+ }
+
+ FORCE_INLINE int write(int64_t timestamp, common::String value,
+ bool isnull) {
+ if (UNLIKELY(data_type_ != common::STRING &&
+ data_type_ != common::TEXT &&
+ data_type_ != common::BLOB)) {
+ return common::E_TYPE_NOT_MATCH;
+ }
+ VPW_DO_WRITE_FOR_TYPE(isnull);
}
FORCE_INLINE uint32_t get_point_numer() const { return statistic_->count_;
}
diff --git a/cpp/test/common/datatype/date_converter_test.cc
b/cpp/test/common/datatype/date_converter_test.cc
new file mode 100644
index 00000000..ad59d572
--- /dev/null
+++ b/cpp/test/common/datatype/date_converter_test.cc
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License a
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "common/datatype/date_converter.h"
+
+#include <gtest/gtest.h>
+
+#include "common/datatype/value.h"
+#include "common/record.h"
+
+namespace common {
+
+class DateConverterTest : public ::testing::Test {
+ protected:
+ void SetUp() override {
+ // Initialize a valid date (2025-07-03)
+ valid_tm_ = {0, 0, 12, 3,
+ 6, 125}; // tm_mday=3, tm_mon=6 (July), tm_year=125
(2025)
+ valid_int_ = 20250703;
+ }
+
+ std::tm valid_tm_{};
+ int32_t valid_int_{};
+};
+
+// Test normal date conversion
+TEST_F(DateConverterTest, DateToIntValidDate) {
+ int32_t result;
+ ASSERT_EQ(DateConverter::date_to_int(valid_tm_, result), common::E_OK);
+ EXPECT_EQ(result, valid_int_);
+}
+
+TEST_F(DateConverterTest, IntToDateValidDate) {
+ std::tm result = {0};
+ ASSERT_EQ(DateConverter::int_to_date(valid_int_, result), common::E_OK);
+ EXPECT_EQ(result.tm_year, valid_tm_.tm_year);
+ EXPECT_EQ(result.tm_mon, valid_tm_.tm_mon);
+ EXPECT_EQ(result.tm_mday, valid_tm_.tm_mday);
+}
+
+// Test round-trip conversion consistency
+TEST_F(DateConverterTest, RoundTripConversion) {
+ std::tm tm_result = {0};
+ int32_t int_result;
+
+ // Forward conversion then backward conversion
+ ASSERT_EQ(DateConverter::date_to_int(valid_tm_, int_result), common::E_OK);
+ ASSERT_EQ(DateConverter::int_to_date(int_result, tm_result), common::E_OK);
+ EXPECT_EQ(tm_result.tm_year, valid_tm_.tm_year);
+ EXPECT_EQ(tm_result.tm_mon, valid_tm_.tm_mon);
+ EXPECT_EQ(tm_result.tm_mday, valid_tm_.tm_mday);
+}
+
+// Test boundary conditions (leap years, month days)
+TEST_F(DateConverterTest, BoundaryConditions) {
+ // Leap day (Feb 29, 2024)
+ std::tm leap_day = {0, 0, 12, 29, 1, 124}; // 2024-02-29
+ int32_t leap_int;
+ EXPECT_EQ(DateConverter::date_to_int(leap_day, leap_int), common::E_OK);
+
+ // Invalid leap day (Feb 29, 2025 - not a leap year)
+ std::tm invalid_leap = {0, 0, 12, 29, 1, 125}; // 2025-02-29
+ EXPECT_EQ(DateConverter::date_to_int(invalid_leap, leap_int),
+ common::E_INVALID_ARG);
+
+ // First and last day of month
+ std::tm first_day = {0, 0, 12, 1, 0, 125}; // 2025-01-01
+ std::tm last_day = {0, 0, 12, 31, 11, 125}; // 2025-12-31
+ EXPECT_EQ(DateConverter::date_to_int(first_day, leap_int), common::E_OK);
+ EXPECT_EQ(DateConverter::date_to_int(last_day, leap_int), common::E_OK);
+}
+
+// Test invalid inputs
+TEST_F(DateConverterTest, InvalidInputs) {
+ std::tm invalid_tm = {0, 0, 12, 32, 6, 125}; // 2025-07-32 (invalid day)
+ int32_t out_int;
+ EXPECT_EQ(DateConverter::date_to_int(invalid_tm, out_int),
+ common::E_INVALID_ARG);
+
+ // Year out of range
+ std::tm year_out_of_range = {0, 0, 12,
+ 3, 6, -901}; // 0999-07-03 (year < 1000)
+ EXPECT_EQ(DateConverter::date_to_int(year_out_of_range, out_int),
+ common::E_INVALID_ARG);
+
+ // Invalid integer format
+ std::tm tm_result = {0};
+ EXPECT_EQ(DateConverter::int_to_date(20251301, tm_result),
+ common::E_INVALID_ARG); // month=13
+ EXPECT_EQ(DateConverter::int_to_date(20250015, tm_result),
+ common::E_INVALID_ARG); // month=0
+}
+
+// Test uninitialized fields
+TEST_F(DateConverterTest, UninitializedFields) {
+ std::tm uninitialized = {
+ 0}; // tm_year etc. are 0 (not explicitly initialized)
+ uninitialized.tm_year = -1; // Mark as invalid
+ int32_t out_int;
+ EXPECT_EQ(DateConverter::date_to_int(uninitialized, out_int),
+ common::E_INVALID_ARG);
+}
+
+} // namespace common
\ No newline at end of file
diff --git a/cpp/test/common/record_test.cc b/cpp/test/common/record_test.cc
index 3a5c67bb..808dafe7 100644
--- a/cpp/test/common/record_test.cc
+++ b/cpp/test/common/record_test.cc
@@ -28,63 +28,54 @@ namespace storage {
TEST(DataPointTest, BoolConstructor) {
DataPoint dp("touch_sensor", true);
EXPECT_EQ(dp.measurement_name_, "touch_sensor");
- EXPECT_EQ(dp.data_type_, common::BOOLEAN);
EXPECT_TRUE(dp.u_.bool_val_);
}
TEST(DataPointTest, Int32Constructor) {
DataPoint dp("temperature", int32_t(100));
EXPECT_EQ(dp.measurement_name_, "temperature");
- EXPECT_EQ(dp.data_type_, common::INT32);
EXPECT_EQ(dp.u_.i32_val_, 100);
}
TEST(DataPointTest, Int64Constructor) {
DataPoint dp("temperature", int64_t(100000));
EXPECT_EQ(dp.measurement_name_, "temperature");
- EXPECT_EQ(dp.data_type_, common::INT64);
EXPECT_EQ(dp.u_.i64_val_, 100000);
}
TEST(DataPointTest, FloatConstructor) {
DataPoint dp("temperature", float(36.6));
EXPECT_EQ(dp.measurement_name_, "temperature");
- EXPECT_EQ(dp.data_type_, common::FLOAT);
EXPECT_FLOAT_EQ(dp.u_.float_val_, 36.6);
}
TEST(DataPointTest, DoubleConstructor) {
DataPoint dp("temperature", double(36.6));
EXPECT_EQ(dp.measurement_name_, "temperature");
- EXPECT_EQ(dp.data_type_, common::DOUBLE);
EXPECT_DOUBLE_EQ(dp.u_.double_val_, 36.6);
}
TEST(DataPointTest, SetInt32) {
DataPoint dp("temperature");
dp.set_i32(100);
- EXPECT_EQ(dp.data_type_, common::INT32);
EXPECT_EQ(dp.u_.i32_val_, 100);
}
TEST(DataPointTest, SetInt64) {
DataPoint dp("temperature");
dp.set_i64(100000);
- EXPECT_EQ(dp.data_type_, common::INT64);
EXPECT_EQ(dp.u_.i64_val_, 100000);
}
TEST(DataPointTest, SetFloat) {
DataPoint dp("temperature");
dp.set_float(36.6);
- EXPECT_EQ(dp.data_type_, common::FLOAT);
EXPECT_FLOAT_EQ(dp.u_.float_val_, 36.6);
}
TEST(DataPointTest, SetDouble) {
DataPoint dp("temperature");
dp.set_double(36.6);
- EXPECT_EQ(dp.data_type_, common::DOUBLE);
EXPECT_DOUBLE_EQ(dp.u_.double_val_, 36.6);
}
@@ -106,7 +97,6 @@ TEST(TsRecordTest, AddPoint) {
ts_record.add_point("temperature", 36.6);
ASSERT_EQ(ts_record.points_.size(), 1);
EXPECT_EQ(ts_record.points_[0].measurement_name_, "temperature");
- EXPECT_EQ(ts_record.points_[0].data_type_, common::DOUBLE);
EXPECT_DOUBLE_EQ(ts_record.points_[0].u_.double_val_, 36.6);
}
@@ -119,7 +109,6 @@ TEST(TsRecordTest, LargeQuantities) {
ASSERT_EQ(ts_record.points_.size(), 10000);
for (int i = 0; i < 10000; i++) {
EXPECT_EQ(ts_record.points_[i].measurement_name_, std::to_string(i));
- EXPECT_EQ(ts_record.points_[i].data_type_, common::DOUBLE);
EXPECT_DOUBLE_EQ(ts_record.points_[i].u_.double_val_, 36.6);
}
}
diff --git a/cpp/test/common/row_record_test.cc
b/cpp/test/common/row_record_test.cc
index 0ed05019..964d0551 100644
--- a/cpp/test/common/row_record_test.cc
+++ b/cpp/test/common/row_record_test.cc
@@ -36,13 +36,6 @@ TEST(FieldTest, TypeConstructor) {
EXPECT_EQ(field.type_, common::BOOLEAN);
}
-TEST(FieldTest, FreeMemory) {
- Field field(common::TEXT);
- field.value_.sval_ = strdup("test");
- field.free_memory();
- EXPECT_EQ(field.value_.sval_, nullptr);
-}
-
TEST(FieldTest, IsType) {
Field field(common::BOOLEAN);
EXPECT_TRUE(field.is_type(common::BOOLEAN));
@@ -96,14 +89,6 @@ TEST(FieldTest, MakeLiteralDouble) {
delete field;
}
-TEST(FieldTest, MakeLiteralString) {
- char* text = strdup("test");
- Field* field = make_literal(text);
- EXPECT_EQ(field->type_, common::TEXT);
- field->free_memory();
- delete field;
-}
-
TEST(FieldTest, MakeLiteralBool) {
Field* field = make_literal(true);
EXPECT_EQ(field->type_, common::BOOLEAN);
diff --git a/cpp/test/reader/bloom_filter_test.cc
b/cpp/test/reader/bloom_filter_test.cc
index d947bf55..7e754df1 100644
--- a/cpp/test/reader/bloom_filter_test.cc
+++ b/cpp/test/reader/bloom_filter_test.cc
@@ -48,7 +48,6 @@ TEST(BloomfilterTest, BloomFilter) {
}
filter.serialize_to(out);
- std::cout << std::endl;
BloomFilter filter2;
filter2.deserialize_from(out);
// ASSERT_EQ(filter, filter2);
diff --git a/cpp/test/writer/table_view/tsfile_writer_table_test.cc
b/cpp/test/writer/table_view/tsfile_writer_table_test.cc
index 73573f73..b187a80f 100644
--- a/cpp/test/writer/table_view/tsfile_writer_table_test.cc
+++ b/cpp/test/writer/table_view/tsfile_writer_table_test.cc
@@ -45,8 +45,7 @@ class TsFileWriterTableTest : public ::testing::Test {
mode_t mode = 0666;
write_file_.create(file_name_, flags, mode);
}
- void TearDown() override { /*remove(file_name_.c_str());*/
- }
+ void TearDown() override { remove(file_name_.c_str()); }
std::string file_name_;
WriteFile write_file_;
@@ -770,3 +769,109 @@ TEST_F(TsFileWriterTableTest, WriteDataWithEmptyField) {
reader.destroy_query_data_set(table_result_set);
ASSERT_EQ(reader.close(), common::E_OK);
}
+
+TEST_F(TsFileWriterTableTest, MultiDatatypes) {
+ std::vector<MeasurementSchema*> measurement_schemas;
+ std::vector<ColumnCategory> column_categories;
+
+ std::vector<std::string> measurement_names = {
+ "level", "num", "bools", "double", "id", "ts", "text", "blob", "date"};
+ std::vector<common::TSDataType> data_types = {
+ FLOAT, INT64, BOOLEAN, DOUBLE, STRING, TIMESTAMP, TEXT, BLOB, DATE};
+
+ for (int i = 0; i < measurement_names.size(); i++) {
+ measurement_schemas.emplace_back(
+ new MeasurementSchema(measurement_names[i], data_types[i]));
+ column_categories.emplace_back(ColumnCategory::FIELD);
+ }
+ auto table_schema =
+ new TableSchema("testTable", measurement_schemas, column_categories);
+ auto tsfile_table_writer =
+ std::make_shared<TsFileTableWriter>(&write_file_, table_schema);
+ int time = 0;
+ Tablet tablet = Tablet(table_schema->get_measurement_names(),
+ table_schema->get_data_types(), 100);
+
+ char* literal = new char[std::strlen("device_id") + 1];
+ std::strcpy(literal, "device_id");
+ String literal_str(literal, std::strlen("device_id"));
+ std::time_t now = std::time(nullptr);
+ std::tm* local_time = std::localtime(&now);
+ std::tm today = {};
+ today.tm_year = local_time->tm_year;
+ today.tm_mon = local_time->tm_mon;
+ today.tm_mday = local_time->tm_mday;
+ for (int i = 0; i < 100; i++) {
+ tablet.add_timestamp(i, static_cast<int64_t>(time++));
+ for (int j = 0; j < measurement_schemas.size(); j++) {
+ switch (data_types[j]) {
+ case BOOLEAN:
+ ASSERT_EQ(tablet.add_value(i, j, true), E_OK);
+ break;
+ case INT64:
+ ASSERT_EQ(tablet.add_value(i, j, (int64_t)415412), E_OK);
+ break;
+ case FLOAT:
+ ASSERT_EQ(tablet.add_value(i, j, (float)1.0), E_OK);
+ break;
+ case DOUBLE:
+ ASSERT_EQ(tablet.add_value(i, j, (double)2.0), E_OK);
+ break;
+ case STRING:
+ ASSERT_EQ(tablet.add_value(i, j, literal_str), E_OK);
+ break;
+ case TEXT:
+ ASSERT_EQ(tablet.add_value(i, j, literal_str), E_OK);
+ break;
+ case BLOB:
+ ASSERT_EQ(tablet.add_value(i, j, literal_str), E_OK);
+ break;
+ case TIMESTAMP:
+ ASSERT_EQ(tablet.add_value(i, j, (int64_t)415412), E_OK);
+ break;
+ case DATE:
+ ASSERT_EQ(tablet.add_value(i, j, today), E_OK);
+ default:
+ break;
+ }
+ }
+ }
+ ASSERT_EQ(tsfile_table_writer->write_table(tablet), E_OK);
+ ASSERT_EQ(tsfile_table_writer->flush(), E_OK);
+ ASSERT_EQ(tsfile_table_writer->close(), E_OK);
+
+ delete table_schema;
+
+ auto reader = TsFileReader();
+ reader.open(write_file_.get_file_path());
+ ResultSet* ret = nullptr;
+ int ret_value = reader.query("testTable", measurement_names, 0, 100, ret);
+ ASSERT_EQ(common::E_OK, ret_value);
+
+ auto table_result_set = (TableResultSet*)ret;
+ bool has_next = false;
+ int cur_line = 0;
+ auto schema = table_result_set->get_metadata();
+ while (IS_SUCC(table_result_set->next(has_next)) && has_next) {
+ int64_t timestamp = table_result_set->get_value<int64_t>(1);
+ ASSERT_EQ(table_result_set->get_value<float>(2), (float)1.0);
+ ASSERT_EQ(table_result_set->get_value<int64_t>(3), (int64_t)415412);
+ ASSERT_EQ(table_result_set->get_value<bool>(4), true);
+ ASSERT_EQ(table_result_set->get_value<double>(5), (double)2.0);
+ ASSERT_EQ(table_result_set->get_value<common::String*>(6)->compare(
+ literal_str),
+ 0);
+ ASSERT_EQ(table_result_set->get_value<int64_t>(7), (int64_t)415412);
+ ASSERT_EQ(table_result_set->get_value<common::String*>(8)->compare(
+ literal_str),
+ 0);
+ ASSERT_EQ(table_result_set->get_value<common::String*>(9)->compare(
+ literal_str),
+ 0);
+ ASSERT_TRUE(DateConverter::is_tm_ymd_equal(
+ table_result_set->get_value<std::tm>(10), today));
+ }
+ reader.destroy_query_data_set(table_result_set);
+ ASSERT_EQ(reader.close(), common::E_OK);
+ delete[] literal;
+}
diff --git a/cpp/test/writer/tsfile_writer_test.cc
b/cpp/test/writer/tsfile_writer_test.cc
index 94947e3c..be8be1f0 100644
--- a/cpp/test/writer/tsfile_writer_test.cc
+++ b/cpp/test/writer/tsfile_writer_test.cc
@@ -78,7 +78,8 @@ class TsFileWriterTest : public ::testing::Test {
}
static std::string field_to_string(storage::Field *value) {
- if (value->type_ == common::TEXT) {
+ if (value->type_ == common::TEXT || value->type_ == STRING ||
+ value->type_ == BLOB) {
return std::string(value->value_.sval_);
} else {
std::stringstream ss;
@@ -90,6 +91,7 @@ class TsFileWriterTest : public ::testing::Test {
ss << value->value_.ival_;
break;
case common::INT64:
+ case common::TIMESTAMP:
ss << value->value_.lval_;
break;
case common::FLOAT:
@@ -122,10 +124,10 @@ TEST_F(TsFileWriterTest, WriteDiffDataType) {
common::TSEncoding encoding = common::TSEncoding::PLAIN;
common::CompressionType compression_type =
common::CompressionType::UNCOMPRESSED;
- std::vector<std::string> measurement_names = {"level", "num", "bools",
- "double", "id"};
- std::vector<common::TSDataType> data_types = {FLOAT, INT64, BOOLEAN,
DOUBLE,
- STRING};
+ std::vector<std::string> measurement_names = {
+ "level", "num", "bools", "double", "id", "ts", "text", "blob", "date"};
+ std::vector<common::TSDataType> data_types = {
+ FLOAT, INT64, BOOLEAN, DOUBLE, STRING, TIMESTAMP, TEXT, BLOB, DATE};
for (uint32_t i = 0; i < measurement_names.size(); i++) {
std::string measurement_name = measurement_names[i];
common::TSDataType data_type = data_types[i];
@@ -139,6 +141,13 @@ TEST_F(TsFileWriterTest, WriteDiffDataType) {
std::strcpy(literal, "device_id");
String literal_str(literal, std::strlen("device_id"));
+ std::time_t now = std::time(nullptr);
+ std::tm *local_time = std::localtime(&now);
+ std::tm today = {};
+ today.tm_year = local_time->tm_year;
+ today.tm_mon = local_time->tm_mon;
+ today.tm_mday = local_time->tm_mday;
+
int row_num = 100000;
for (int i = 0; i < row_num; ++i) {
TsRecord record(1622505600000 + i * 100, device_name);
@@ -161,6 +170,17 @@ TEST_F(TsFileWriterTest, WriteDiffDataType) {
case STRING:
record.add_point(measurement_name, literal_str);
break;
+ case TEXT:
+ record.add_point(measurement_name, literal_str);
+ break;
+ case BLOB:
+ record.add_point(measurement_name, literal_str);
+ break;
+ case TIMESTAMP:
+ record.add_point(measurement_name, (int64_t)415412);
+ break;
+ case DATE:
+ record.add_point(measurement_name, today);
default:
break;
}
@@ -198,6 +218,11 @@ TEST_F(TsFileWriterTest, WriteDiffDataType) {
ASSERT_EQ(qds->get_value<bool>(4), true);
ASSERT_EQ(qds->get_value<double>(5), (double)2.0);
ASSERT_EQ(qds->get_value<common::String *>(6)->compare(literal_str),
0);
+ ASSERT_EQ(qds->get_value<int64_t>(7), (int64_t)415412);
+ ASSERT_EQ(qds->get_value<common::String *>(8)->compare(literal_str),
0);
+ ASSERT_EQ(qds->get_value<common::String *>(9)->compare(literal_str),
0);
+ ASSERT_TRUE(
+ DateConverter::is_tm_ymd_equal(qds->get_value<std::tm>(10),
today));
ASSERT_EQ(qds->get_value<float>(measurement_names[0]), (float)1.0);
ASSERT_EQ(qds->get_value<int64_t>(measurement_names[1]),
@@ -207,11 +232,21 @@ TEST_F(TsFileWriterTest, WriteDiffDataType) {
ASSERT_EQ(qds->get_value<common::String *>(measurement_names[4])
->compare(literal_str),
0);
+ ASSERT_EQ(qds->get_value<int64_t>(measurement_names[5]),
+ (int64_t)415412);
+ ASSERT_EQ(qds->get_value<common::String *>(measurement_names[6])
+ ->compare(literal_str),
+ 0);
+ ASSERT_EQ(qds->get_value<common::String *>(measurement_names[7])
+ ->compare(literal_str),
+ 0);
+ ASSERT_TRUE(DateConverter::is_tm_ymd_equal(
+ qds->get_value<std::tm>(measurement_names[8]), today));
} while (true);
delete[] literal;
EXPECT_EQ(cur_record_num, row_num);
reader.destroy_query_data_set(qds);
- reader.close();
+ ASSERT_EQ(reader.close(), E_OK);
}
TEST_F(TsFileWriterTest, RegisterTimeSeries) {