ColinLeeo commented on code in PR #409:
URL: https://github.com/apache/tsfile/pull/409#discussion_r1957279228
##########
cpp/src/common/allocator/my_string.h:
##########
@@ -35,6 +35,13 @@ struct String {
String() : buf_(nullptr), len_(0) {}
String(char *buf, uint32_t len) : buf_(buf), len_(len) {}
+ String(const std::string& str, common::PageArena& pa) : buf_(nullptr),
len_(0) {
+ dup_from(str, pa);
+ }
+ String(const std::string& str) {
+ buf_ = (char*)str.c_str();
+ len_ = str.size();
+ }
Review Comment:
What this func used for?
##########
cpp/src/common/schema.h:
##########
@@ -20,319 +20,342 @@
#ifndef COMMON_SCHEMA_H
#define COMMON_SCHEMA_H
+#include <algorithm>
#include <map> // use unordered_map instead
#include <memory>
#include <string>
+#include <unordered_map>
#include "common/db_common.h"
#include "writer/time_chunk_writer.h"
#include "writer/value_chunk_writer.h"
namespace storage {
- class ChunkWriter;
-}
+class ChunkWriter;
+class ValueChunkWriter;
+class TimeChunkWriter;
+} // namespace storage
namespace storage {
- /* schema information for one measurement */
- struct MeasurementSchema {
- std::string measurement_name_; // for example: "s1"
- common::TSDataType data_type_;
- common::TSEncoding encoding_;
- common::CompressionType compression_type_;
- storage::ChunkWriter *chunk_writer_;
- ValueChunkWriter *value_chunk_writer_;
- std::map<std::string, std::string> props_;
-
- MeasurementSchema()
- : measurement_name_(),
- data_type_(common::INVALID_DATATYPE),
- encoding_(common::INVALID_ENCODING),
- compression_type_(common::INVALID_COMPRESSION),
- chunk_writer_(nullptr),
- value_chunk_writer_(nullptr) {
- }
-
- MeasurementSchema(const std::string &measurement_name,
- common::TSDataType data_type)
- : measurement_name_(measurement_name),
- data_type_(data_type),
- encoding_(get_default_encoding_for_type(data_type)),
- compression_type_(common::UNCOMPRESSED),
- chunk_writer_(nullptr),
- value_chunk_writer_(nullptr) {
- }
-
- MeasurementSchema(const std::string &measurement_name,
- common::TSDataType data_type, common::TSEncoding
encoding,
- common::CompressionType compression_type)
- : measurement_name_(measurement_name),
- data_type_(data_type),
- encoding_(encoding),
- compression_type_(compression_type),
- chunk_writer_(nullptr),
- value_chunk_writer_(nullptr) {
- }
-
- int serialize_to(common::ByteStream &out) {
- int ret = common::E_OK;
- if (RET_FAIL(
+/* schema information for one measurement */
+struct MeasurementSchema {
+ std::string measurement_name_; // for example: "s1"
+ common::TSDataType data_type_;
+ common::TSEncoding encoding_;
+ common::CompressionType compression_type_;
+ storage::ChunkWriter *chunk_writer_;
+ ValueChunkWriter *value_chunk_writer_;
+ std::map<std::string, std::string> props_;
+
+ MeasurementSchema()
+ : measurement_name_(),
+ data_type_(common::INVALID_DATATYPE),
+ encoding_(common::INVALID_ENCODING),
+ compression_type_(common::INVALID_COMPRESSION),
+ chunk_writer_(nullptr),
+ value_chunk_writer_(nullptr) {}
+
+ MeasurementSchema(const std::string &measurement_name,
+ common::TSDataType data_type)
+ : measurement_name_(measurement_name),
+ data_type_(data_type),
+ encoding_(get_default_encoding_for_type(data_type)),
+ compression_type_(common::UNCOMPRESSED),
+ chunk_writer_(nullptr),
+ value_chunk_writer_(nullptr) {}
+
+ MeasurementSchema(const std::string &measurement_name,
+ common::TSDataType data_type, common::TSEncoding
encoding,
+ common::CompressionType compression_type)
+ : measurement_name_(measurement_name),
+ data_type_(data_type),
+ encoding_(encoding),
+ compression_type_(compression_type),
+ chunk_writer_(nullptr),
+ value_chunk_writer_(nullptr) {}
+
+ int serialize_to(common::ByteStream &out) {
+ int ret = common::E_OK;
+ if (RET_FAIL(
common::SerializationUtil::write_str(measurement_name_, out)))
{
- } else if (RET_FAIL(
- common::SerializationUtil::write_ui8(data_type_, out))) {
- } else if (RET_FAIL(
- common::SerializationUtil::write_ui8(encoding_, out))) {
- } else if (RET_FAIL(common::SerializationUtil::write_ui8(
- compression_type_, out))) {
- }
- if (ret == common::E_OK) {
- if
(RET_FAIL(common::SerializationUtil::write_ui32(props_.size(),
- out))) {
- for (const auto &prop: props_) {
- if (RET_FAIL(common::SerializationUtil::write_str(
+ } else if (RET_FAIL(
+ common::SerializationUtil::write_ui8(data_type_, out)))
{
+ } else if (RET_FAIL(
+ common::SerializationUtil::write_ui8(encoding_, out))) {
+ } else if (RET_FAIL(common::SerializationUtil::write_ui8(
+ compression_type_, out))) {
+ }
+ if (ret == common::E_OK) {
+ if (RET_FAIL(common::SerializationUtil::write_ui32(props_.size(),
+ out))) {
+ for (const auto &prop : props_) {
+ if (RET_FAIL(common::SerializationUtil::write_str(
prop.first, out))) {
- } else if
(RET_FAIL(common::SerializationUtil::write_str(
- prop.second, out))) {
- }
- if (IS_FAIL(ret)) break;
+ } else if (RET_FAIL(common::SerializationUtil::write_str(
+ prop.second, out))) {
}
+ if (IS_FAIL(ret)) break;
}
}
- return ret;
}
+ return ret;
+ }
- int deserialize_from(common::ByteStream &in) {
- int ret = common::E_OK;
- uint8_t data_type = common::TSDataType::INVALID_DATATYPE,
+ int deserialize_from(common::ByteStream &in) {
+ int ret = common::E_OK;
+ uint8_t data_type = common::TSDataType::INVALID_DATATYPE,
encoding = common::TSEncoding::INVALID_ENCODING,
compression_type =
common::CompressionType::INVALID_COMPRESSION;
- if (RET_FAIL(
+ if (RET_FAIL(
common::SerializationUtil::read_str(measurement_name_, in))) {
- } else if (RET_FAIL(
- common::SerializationUtil::read_ui8(data_type, in))) {
- } else if (RET_FAIL(
- common::SerializationUtil::read_ui8(encoding, in))) {
- } else if (RET_FAIL(common::SerializationUtil::read_ui8(
- compression_type, in))) {
- }
- data_type_ = static_cast<common::TSDataType>(data_type);
- encoding_ = static_cast<common::TSEncoding>(encoding);
- compression_type_ =
static_cast<common::CompressionType>(compression_type);
- uint32_t props_size;
- if (ret == common::E_OK) {
- if (RET_FAIL(common::SerializationUtil::read_ui32(props_size,
- in))) {
- for (uint32_t i = 0; i < props_.size(); ++i) {
- std::string key, value;
- if (RET_FAIL(common::SerializationUtil::read_str(
- key, in))) {
- } else if
(RET_FAIL(common::SerializationUtil::read_str(
- value, in))) {
- }
- props_.insert(std::make_pair(key, value));
- if (IS_FAIL(ret)) break;
+ } else if (RET_FAIL(
+ common::SerializationUtil::read_ui8(data_type, in))) {
+ } else if (RET_FAIL(
+ common::SerializationUtil::read_ui8(encoding, in))) {
+ } else if (RET_FAIL(common::SerializationUtil::read_ui8(
+ compression_type, in))) {
+ }
+ data_type_ = static_cast<common::TSDataType>(data_type);
+ encoding_ = static_cast<common::TSEncoding>(encoding);
+ compression_type_ =
+ static_cast<common::CompressionType>(compression_type);
+ uint32_t props_size;
+ if (ret == common::E_OK) {
+ if (RET_FAIL(
+ common::SerializationUtil::read_ui32(props_size, in))) {
+ for (uint32_t i = 0; i < props_.size(); ++i) {
+ std::string key, value;
+ if (RET_FAIL(
+ common::SerializationUtil::read_str(key, in))) {
+ } else if (RET_FAIL(common::SerializationUtil::read_str(
+ value, in))) {
}
+ props_.insert(std::make_pair(key, value));
+ if (IS_FAIL(ret)) break;
}
}
- return ret;
}
- };
+ return ret;
+ }
+};
- typedef std::map<std::string, MeasurementSchema *> MeasurementSchemaMap;
- typedef std::map<std::string, MeasurementSchema *>::iterator
+typedef std::map<std::string, MeasurementSchema *> MeasurementSchemaMap;
+typedef std::map<std::string, MeasurementSchema *>::iterator
MeasurementSchemaMapIter;
- typedef std::pair<MeasurementSchemaMapIter, bool>
+typedef std::pair<MeasurementSchemaMapIter, bool>
MeasurementSchemaMapInsertResult;
- /* schema information for a device */
- struct MeasurementSchemaGroup {
- // measurement_name -> MeasurementSchema
- MeasurementSchemaMap measurement_schema_map_;
- bool is_aligned_ = false;
- TimeChunkWriter *time_chunk_writer_ = nullptr;
- };
-
- enum class ColumnCategory { TAG = 0, FIELD = 1 };
-
- class TableSchema {
- public:
- static void to_lowercase_inplace(std::string &str) {
- std::transform(str.begin(), str.end(), str.begin(),
- [](unsigned char c) -> unsigned char { return
std::tolower(c); });
- }
-
- TableSchema() = default;
-
- TableSchema(const std::string &table_name,
- const std::vector<MeasurementSchema*>
- &column_schemas,
- const std::vector<ColumnCategory> &column_categories)
- : table_name_(table_name),
- column_categories_(column_categories) {
- to_lowercase_inplace(table_name_);
- for (const auto column_schema : column_schemas) {
- if (column_schema != nullptr) {
-
column_schemas_.emplace_back(std::shared_ptr<MeasurementSchema>(column_schema));
- }
- }
- int idx = 0;
- for (const auto &measurement_schema: column_schemas_) {
- to_lowercase_inplace(measurement_schema->measurement_name_);
- column_pos_index_.insert(
- std::make_pair(measurement_schema->measurement_name_,
idx++));
+/* schema information for a device */
+struct MeasurementSchemaGroup {
+ // measurement_name -> MeasurementSchema
+ MeasurementSchemaMap measurement_schema_map_;
+ bool is_aligned_ = false;
+ TimeChunkWriter *time_chunk_writer_ = nullptr;
+};
+
+enum class ColumnCategory { TAG = 0, FIELD = 1 };
+
+class TableSchema {
+ public:
+ static void to_lowercase_inplace(std::string &str) {
+ std::transform(
+ str.begin(), str.end(), str.begin(),
+ [](unsigned char c) -> unsigned char { return std::tolower(c); });
+ }
+
Review Comment:
move to private like to_lower or move to utils
##########
cpp/src/common/tsfile_common.h:
##########
@@ -58,847 +58,850 @@ typedef int64_t TsFileID;
// one page exists in the chunk but we know that fact after we writer
// the first page.
struct PageHeader {
- uint32_t uncompressed_size_;
- uint32_t compressed_size_;
- Statistic *statistic_;
-
- PageHeader()
- : uncompressed_size_(0), compressed_size_(0), statistic_(nullptr) {}
- ~PageHeader() { reset(); }
- void reset() {
- if (statistic_ != nullptr) {
- StatisticFactory::free(statistic_);
- statistic_ = nullptr;
- }
- uncompressed_size_ = 0;
- compressed_size_ = 0;
- }
- int deserialize_from(common::ByteStream &in, bool deserialize_stat,
- common::TSDataType data_type) {
- int ret = common::E_OK;
- if (RET_FAIL(common::SerializationUtil::read_var_uint(
- uncompressed_size_, in))) {
- } else if (RET_FAIL(common::SerializationUtil::read_var_uint(
- compressed_size_, in))) {
- } else if (deserialize_stat) {
- statistic_ = StatisticFactory::alloc_statistic(data_type);
- if (IS_NULL(statistic_)) {
- return common::E_OOM;
- } else if (RET_FAIL(statistic_->deserialize_from(in))) {
- }
- }
- return ret;
- }
-
- /** max page header size without statistics. */
- static int estimat_max_page_header_size_without_statistics() {
- // uncompressedSize, compressedSize
- // because we use unsigned varInt to encode these two integer, each
- // unsigned varInt will cost at most 5 bytes
- return 2 * (4 + 1);
- }
+ uint32_t uncompressed_size_;
+ uint32_t compressed_size_;
+ Statistic *statistic_;
+
+ PageHeader()
+ : uncompressed_size_(0), compressed_size_(0), statistic_(nullptr) {}
+ ~PageHeader() { reset(); }
+ void reset() {
+ if (statistic_ != nullptr) {
+ StatisticFactory::free(statistic_);
+ statistic_ = nullptr;
+ }
+ uncompressed_size_ = 0;
+ compressed_size_ = 0;
+ }
+ int deserialize_from(common::ByteStream &in, bool deserialize_stat,
+ common::TSDataType data_type) {
+ int ret = common::E_OK;
+ if (RET_FAIL(common::SerializationUtil::read_var_uint(
+ uncompressed_size_, in))) {
+ } else if (RET_FAIL(common::SerializationUtil::read_var_uint(
+ compressed_size_, in))) {
+ } else if (deserialize_stat) {
+ statistic_ = StatisticFactory::alloc_statistic(data_type);
+ if (IS_NULL(statistic_)) {
+ return common::E_OOM;
+ } else if (RET_FAIL(statistic_->deserialize_from(in))) {
+ }
+ }
+ return ret;
+ }
+
+ /** max page header size without statistics. */
+ static int estimat_max_page_header_size_without_statistics() {
+ // uncompressedSize, compressedSize
+ // because we use unsigned varInt to encode these two integer, each
+ // unsigned varInt will cost at most 5 bytes
+ return 2 * (4 + 1);
+ }
#ifndef NDEBUG
- friend std::ostream &operator<<(std::ostream &os, const PageHeader &h) {
- os << "{uncompressed_size_=" << h.uncompressed_size_
- << ", compressed_size_=" << h.uncompressed_size_;
- if (h.statistic_ == nullptr) {
- os << ", stat=nil}";
- } else {
- os << ", stat=" << h.statistic_->to_string() << "}";
- }
- return os;
- }
+ friend std::ostream &operator<<(std::ostream &os, const PageHeader &h) {
+ os << "{uncompressed_size_=" << h.uncompressed_size_
+ << ", compressed_size_=" << h.uncompressed_size_;
+ if (h.statistic_ == nullptr) {
+ os << ", stat=nil}";
+ } else {
+ os << ", stat=" << h.statistic_->to_string() << "}";
+ }
+ return os;
+ }
#endif
};
struct ChunkHeader {
- ChunkHeader()
- : measurement_name_(""),
- data_size_(0),
- data_type_(common::INVALID_DATATYPE),
- compression_type_(common::INVALID_COMPRESSION),
- encoding_type_(common::INVALID_ENCODING),
- num_of_pages_(0),
- serialized_size_(0),
- chunk_type_(0) {}
-
- void reset() {
- measurement_name_.clear();
- data_size_ = 0;
- data_type_ = common::INVALID_DATATYPE;
- compression_type_ = common::INVALID_COMPRESSION;
- encoding_type_ = common::INVALID_ENCODING;
- num_of_pages_ = 0;
- serialized_size_ = 0;
- chunk_type_ = 0;
- }
-
- ~ChunkHeader() = default;
-
- int serialize_to(common::ByteStream &out) {
- int ret = common::E_OK;
- if (RET_FAIL(common::SerializationUtil::write_char(chunk_type_, out))) {
- } else if (RET_FAIL(common::SerializationUtil::write_str(
- measurement_name_, out))) {
- } else if (RET_FAIL(common::SerializationUtil::write_var_uint(
- data_size_, out))) {
- } else if (RET_FAIL(common::SerializationUtil::write_char(data_type_,
- out))) {
- } else if (RET_FAIL(common::SerializationUtil::write_char(
- compression_type_, out))) {
- } else if (RET_FAIL(common::SerializationUtil::write_char(
- encoding_type_, out))) {
- }
- return ret;
- }
- int deserialize_from(common::ByteStream &in) {
- int ret = common::E_OK;
- in.mark_read_pos();
- if (RET_FAIL(common::SerializationUtil::read_char(chunk_type_, in))) {
- } else if (RET_FAIL(common::SerializationUtil::read_str(
- measurement_name_, in))) {
- } else if (RET_FAIL(common::SerializationUtil::read_var_uint(data_size_,
- in))) {
- } else if (RET_FAIL(common::SerializationUtil::read_char(
- (char &) data_type_, in))) {
- } else if (RET_FAIL(common::SerializationUtil::read_char(
- (char &) compression_type_, in))) {
- } else if (RET_FAIL(common::SerializationUtil::read_char(
- (char &) encoding_type_, in))) {
- } else {
- serialized_size_ = in.get_mark_len();
- }
- return ret;
- }
+ ChunkHeader()
+ : measurement_name_(""),
+ data_size_(0),
+ data_type_(common::INVALID_DATATYPE),
+ compression_type_(common::INVALID_COMPRESSION),
+ encoding_type_(common::INVALID_ENCODING),
+ num_of_pages_(0),
+ serialized_size_(0),
+ chunk_type_(0) {}
+
+ void reset() {
+ measurement_name_.clear();
+ data_size_ = 0;
+ data_type_ = common::INVALID_DATATYPE;
+ compression_type_ = common::INVALID_COMPRESSION;
+ encoding_type_ = common::INVALID_ENCODING;
+ num_of_pages_ = 0;
+ serialized_size_ = 0;
+ chunk_type_ = 0;
+ }
+
+ ~ChunkHeader() = default;
+
+ int serialize_to(common::ByteStream &out) {
+ int ret = common::E_OK;
+ if (RET_FAIL(common::SerializationUtil::write_char(chunk_type_, out)))
{
+ } else if (RET_FAIL(common::SerializationUtil::write_str(
+ measurement_name_, out))) {
+ } else if (RET_FAIL(common::SerializationUtil::write_var_uint(
+ data_size_, out))) {
+ } else if (RET_FAIL(common::SerializationUtil::write_char(data_type_,
+ out))) {
+ } else if (RET_FAIL(common::SerializationUtil::write_char(
+ compression_type_, out))) {
+ } else if (RET_FAIL(common::SerializationUtil::write_char(
+ encoding_type_, out))) {
+ }
+ return ret;
+ }
+ int deserialize_from(common::ByteStream &in) {
+ int ret = common::E_OK;
+ in.mark_read_pos();
+ if (RET_FAIL(common::SerializationUtil::read_char(chunk_type_, in))) {
+ } else if (RET_FAIL(common::SerializationUtil::read_str(
+ measurement_name_, in))) {
+ } else if
(RET_FAIL(common::SerializationUtil::read_var_uint(data_size_,
+ in))) {
+ } else if (RET_FAIL(common::SerializationUtil::read_char(
+ (char &)data_type_, in))) {
+ } else if (RET_FAIL(common::SerializationUtil::read_char(
+ (char &)compression_type_, in))) {
+ } else if (RET_FAIL(common::SerializationUtil::read_char(
+ (char &)encoding_type_, in))) {
+ } else {
+ serialized_size_ = in.get_mark_len();
+ }
+ return ret;
+ }
#ifndef NDEBUG
- friend std::ostream &operator<<(std::ostream &os, const ChunkHeader &h) {
- os << "{measurement_name=" << h.measurement_name_
- << ", data_size=" << h.data_size_ << ", data_type=" << h.data_type_
- << ", compression_type=" << h.compression_type_
- << ", encoding_type=" << h.encoding_type_
- << ", num_of_pages=" << h.num_of_pages_
- << ", serialized_size=" << h.serialized_size_
- << ", chunk_type=" << (int) h.chunk_type_ << "}";
- return os;
- }
+ friend std::ostream &operator<<(std::ostream &os, const ChunkHeader &h) {
+ os << "{measurement_name=" << h.measurement_name_
+ << ", data_size=" << h.data_size_ << ", data_type=" << h.data_type_
+ << ", compression_type=" << h.compression_type_
+ << ", encoding_type=" << h.encoding_type_
+ << ", num_of_pages=" << h.num_of_pages_
+ << ", serialized_size=" << h.serialized_size_
+ << ", chunk_type=" << (int)h.chunk_type_ << "}";
+ return os;
+ }
#endif
- std::string measurement_name_;
- uint32_t data_size_;
- common::TSDataType data_type_;
- common::CompressionType compression_type_;
- common::TSEncoding encoding_type_;
- int32_t num_of_pages_;
- int32_t serialized_size_; // TODO seems no usage
- char chunk_type_; // TODO give a description here
+ std::string measurement_name_;
+ uint32_t data_size_;
+ common::TSDataType data_type_;
+ common::CompressionType compression_type_;
+ common::TSEncoding encoding_type_;
+ int32_t num_of_pages_;
+ int32_t serialized_size_; // TODO seems no usage
+ char chunk_type_; // TODO give a description here
- static const int MIN_SERIALIZED_SIZE = 7;
+ static const int MIN_SERIALIZED_SIZE = 7;
};
struct ChunkMeta {
- // std::string measurement_name_;
- common::String measurement_name_;
- common::TSDataType data_type_;
- int64_t offset_of_chunk_header_;
- Statistic *statistic_;
- common::TsID ts_id_;
- char mask_;
- common::TSEncoding encoding_;
- common::CompressionType compression_type_;
-
- ChunkMeta()
- : measurement_name_(),
- data_type_(),
- offset_of_chunk_header_(0),
- statistic_(nullptr),
- ts_id_(),
- mask_(0) {}
-
- int init(const common::String &measurement_name,
- common::TSDataType data_type, int64_t offset_of_chunk_header,
- Statistic *stat, const common::TsID &ts_id, char mask,
- common::TSEncoding encoding,
- common::CompressionType compression_type, common::PageArena &pa) {
- // TODO check parameter valid
- measurement_name_.dup_from(measurement_name, pa);
- data_type_ = data_type;
- offset_of_chunk_header_ = offset_of_chunk_header;
- statistic_ = stat;
- ts_id_ = ts_id;
- mask_ = mask;
- encoding_ = encoding;
- compression_type_ = compression_type;
- return common::E_OK;
- }
- FORCE_INLINE void clone_statistic_from(Statistic *stat) {
- clone_statistic(stat, statistic_, data_type_);
- }
- FORCE_INLINE int clone_from(ChunkMeta &that, common::PageArena *pa) {
- int ret = common::E_OK;
- if (RET_FAIL(measurement_name_.dup_from(that.measurement_name_, *pa))) {
- return ret;
- }
- data_type_ = that.data_type_;
- offset_of_chunk_header_ = that.offset_of_chunk_header_;
- if (that.statistic_ != nullptr) {
- statistic_ =
- StatisticFactory::alloc_statistic_with_pa(data_type_, pa);
- if (IS_NULL(statistic_)) {
- return common::E_OOM;
- }
- clone_statistic_from(that.statistic_);
- }
- ts_id_ = that.ts_id_;
- mask_ = that.mask_;
- return ret;
- }
- int serialize_to(common::ByteStream &out, bool serialize_statistic) {
- int ret = common::E_OK;
- if (RET_FAIL(common::SerializationUtil::write_i64(
- offset_of_chunk_header_, out))) {
- } else if (serialize_statistic) {
- ret = statistic_->serialize_to(out);
- }
- return ret;
- }
- int deserialize_from(common::ByteStream &in, bool deserialize_stat,
- common::PageArena *pa) {
- int ret = common::E_OK;
- if (RET_FAIL(common::SerializationUtil::read_i64(
- offset_of_chunk_header_, in))) {
- } else if (deserialize_stat) {
- statistic_ =
- StatisticFactory::alloc_statistic_with_pa(data_type_, pa);
- if (IS_NULL(statistic_)) {
- ret = common::E_OOM;
- } else {
- ret = statistic_->deserialize_from(in);
- }
- }
- return ret;
- }
+ // std::string measurement_name_;
+ common::String measurement_name_;
+ common::TSDataType data_type_;
+ int64_t offset_of_chunk_header_;
+ Statistic *statistic_;
+ common::TsID ts_id_;
+ char mask_;
+ common::TSEncoding encoding_;
+ common::CompressionType compression_type_;
+
+ ChunkMeta()
+ : measurement_name_(),
+ data_type_(),
+ offset_of_chunk_header_(0),
+ statistic_(nullptr),
+ ts_id_(),
+ mask_(0) {}
+
+ int init(const common::String &measurement_name,
+ common::TSDataType data_type, int64_t offset_of_chunk_header,
+ Statistic *stat, const common::TsID &ts_id, char mask,
+ common::TSEncoding encoding,
+ common::CompressionType compression_type, common::PageArena &pa) {
+ // TODO check parameter valid
+ measurement_name_.dup_from(measurement_name, pa);
+ data_type_ = data_type;
+ offset_of_chunk_header_ = offset_of_chunk_header;
+ statistic_ = stat;
+ ts_id_ = ts_id;
+ mask_ = mask;
+ encoding_ = encoding;
+ compression_type_ = compression_type;
+ return common::E_OK;
+ }
+ FORCE_INLINE void clone_statistic_from(Statistic *stat) {
+ clone_statistic(stat, statistic_, data_type_);
+ }
+ FORCE_INLINE int clone_from(ChunkMeta &that, common::PageArena *pa) {
+ int ret = common::E_OK;
+ if (RET_FAIL(measurement_name_.dup_from(that.measurement_name_, *pa)))
{
+ return ret;
+ }
+ data_type_ = that.data_type_;
+ offset_of_chunk_header_ = that.offset_of_chunk_header_;
+ if (that.statistic_ != nullptr) {
+ statistic_ =
+ StatisticFactory::alloc_statistic_with_pa(data_type_, pa);
+ if (IS_NULL(statistic_)) {
+ return common::E_OOM;
+ }
+ clone_statistic_from(that.statistic_);
+ }
+ ts_id_ = that.ts_id_;
+ mask_ = that.mask_;
+ return ret;
+ }
+ int serialize_to(common::ByteStream &out, bool serialize_statistic) {
+ int ret = common::E_OK;
+ if (RET_FAIL(common::SerializationUtil::write_i64(
+ offset_of_chunk_header_, out))) {
+ } else if (serialize_statistic) {
+ ret = statistic_->serialize_to(out);
+ }
+ return ret;
+ }
+ int deserialize_from(common::ByteStream &in, bool deserialize_stat,
+ common::PageArena *pa) {
+ int ret = common::E_OK;
+ if (RET_FAIL(common::SerializationUtil::read_i64(
+ offset_of_chunk_header_, in))) {
+ } else if (deserialize_stat) {
+ statistic_ =
+ StatisticFactory::alloc_statistic_with_pa(data_type_, pa);
+ if (IS_NULL(statistic_)) {
+ ret = common::E_OOM;
+ } else {
+ ret = statistic_->deserialize_from(in);
+ }
+ }
+ return ret;
+ }
#ifndef NDEBUG
- friend std::ostream &operator<<(std::ostream &os, const ChunkMeta &cm) {
- os << "{measurement_name=" << cm.measurement_name_
- << ", data_type=" << cm.data_type_
- << ", offset_of_chunk_header=" << cm.offset_of_chunk_header_
- << ", ts_id=" << cm.ts_id_.to_string()
- << ", mask=" << ((int) cm.mask_);
- if (cm.statistic_ == nullptr) {
- os << ", statistic=nil}";
- } else {
- os << ", statistic=" << cm.statistic_->to_string() << "}";
- }
- return os;
- }
+ friend std::ostream &operator<<(std::ostream &os, const ChunkMeta &cm) {
+ os << "{measurement_name=" << cm.measurement_name_
+ << ", data_type=" << cm.data_type_
+ << ", offset_of_chunk_header=" << cm.offset_of_chunk_header_
+ << ", ts_id=" << cm.ts_id_.to_string()
+ << ", mask=" << ((int)cm.mask_);
+ if (cm.statistic_ == nullptr) {
+ os << ", statistic=nil}";
+ } else {
+ os << ", statistic=" << cm.statistic_->to_string() << "}";
+ }
+ return os;
+ }
#endif
};
struct ChunkGroupMeta {
- std::shared_ptr<IDeviceID> device_id_;
- common::SimpleList<ChunkMeta *> chunk_meta_list_;
-
- explicit ChunkGroupMeta(common::PageArena *pa_ptr)
- : chunk_meta_list_(pa_ptr) {}
-
- FORCE_INLINE int init(std::shared_ptr<IDeviceID> device_id) {
- device_id_ = device_id;
- return 0;
- }
- FORCE_INLINE int push(ChunkMeta *cm) {
- return chunk_meta_list_.push_back(cm);
- }
+ std::shared_ptr<IDeviceID> device_id_;
+ common::SimpleList<ChunkMeta *> chunk_meta_list_;
+
+ explicit ChunkGroupMeta(common::PageArena *pa_ptr)
+ : chunk_meta_list_(pa_ptr) {}
+
+ FORCE_INLINE int init(std::shared_ptr<IDeviceID> device_id) {
+ device_id_ = device_id;
+ return 0;
+ }
+ FORCE_INLINE int push(ChunkMeta *cm) {
+ return chunk_meta_list_.push_back(cm);
+ }
};
class ITimeseriesIndex {
-public:
- ITimeseriesIndex() {}
- ~ITimeseriesIndex() {}
- virtual common::SimpleList<ChunkMeta *> *get_chunk_meta_list() const {
- return nullptr;
- }
- virtual common::SimpleList<ChunkMeta *> *get_time_chunk_meta_list() const {
- return nullptr;
- }
- virtual common::SimpleList<ChunkMeta *> *get_value_chunk_meta_list() const {
- return nullptr;
- }
-
- virtual common::String get_measurement_name() { return common::String(); }
- virtual common::TSDataType get_data_type() const {
- return common::INVALID_DATATYPE;
- }
- virtual Statistic *get_statistic() const { return nullptr; }
+ public:
+ ITimeseriesIndex() {}
+ ~ITimeseriesIndex() {}
+ virtual common::SimpleList<ChunkMeta *> *get_chunk_meta_list() const {
+ return nullptr;
+ }
+ virtual common::SimpleList<ChunkMeta *> *get_time_chunk_meta_list() const {
+ return nullptr;
+ }
+ virtual common::SimpleList<ChunkMeta *> *get_value_chunk_meta_list() const
{
+ return nullptr;
+ }
+
+ virtual common::String get_measurement_name() const {
+ return common::String();
+ }
+ virtual common::TSDataType get_data_type() const {
+ return common::INVALID_DATATYPE;
+ }
+ virtual Statistic *get_statistic() const { return nullptr; }
};
/*
* A TimeseriesIndex may have one or more chunk metas,
* that means we have such a map: <Timeseries, List<ChunkMeta>>.
*/
class TimeseriesIndex : public ITimeseriesIndex {
-public:
- static const uint32_t CHUNK_META_LIST_SERIALIZED_BUF_PAGE_SIZE = 128;
- static const uint32_t PAGE_ARENA_PAGE_SIZE = 256;
- static const common::AllocModID PAGE_ARENA_MOD_ID =
- common::MOD_TIMESERIES_INDEX_OBJ;
-
-public:
- TimeseriesIndex()
- : timeseries_meta_type_((char) 255),
- chunk_meta_list_data_size_(0),
- measurement_name_(),
- ts_id_(),
- data_type_(common::INVALID_DATATYPE),
- statistic_(nullptr),
- statistic_from_pa_(false),
- chunk_meta_list_serialized_buf_(
- CHUNK_META_LIST_SERIALIZED_BUF_PAGE_SIZE, PAGE_ARENA_MOD_ID),
- chunk_meta_list_(nullptr) {
- // page_arena_.init(PAGE_ARENA_PAGE_SIZE, PAGE_ARENA_MOD_ID);
- }
- ~TimeseriesIndex() { destroy(); }
- void destroy() {
- // page_arena_.destroy();
- reset();
- }
- void reset() // FIXME reuse
- {
- timeseries_meta_type_ = 0;
- chunk_meta_list_data_size_ = 0;
- measurement_name_.reset();
- ts_id_.reset();
- data_type_ = common::VECTOR;
- chunk_meta_list_serialized_buf_.reset();
- if (statistic_ != nullptr && !statistic_from_pa_) {
- StatisticFactory::free(statistic_);
- statistic_ = nullptr;
- }
- }
-
- int add_chunk_meta(ChunkMeta *chunk_meta, bool serialize_statistic);
- FORCE_INLINE int set_measurement_name(common::String &measurement_name,
- common::PageArena &pa) {
- return measurement_name_.dup_from(measurement_name, pa);
- }
- FORCE_INLINE void set_measurement_name(common::String &measurement_name) {
- measurement_name_.shallow_copy_from(measurement_name);
- }
- FORCE_INLINE virtual common::String get_measurement_name() {
- return measurement_name_;
- }
- virtual inline common::SimpleList<ChunkMeta *> *get_chunk_meta_list()
- const {
- return chunk_meta_list_;
- }
- FORCE_INLINE void set_ts_meta_type(char ts_meta_type) {
- timeseries_meta_type_ = ts_meta_type;
- }
- FORCE_INLINE void set_data_type(common::TSDataType data_type) {
- data_type_ = data_type;
- }
- FORCE_INLINE virtual common::TSDataType get_data_type() const {
- return data_type_;
- }
- int init_statistic(common::TSDataType data_type) {
- if (statistic_ != nullptr &&
- !statistic_from_pa_) { // clear old statistic
- StatisticFactory::free(statistic_);
- statistic_ = nullptr;
- }
- statistic_ = StatisticFactory::alloc_statistic(data_type);
- if (IS_NULL(statistic_)) {
- return common::E_OOM;
- }
- statistic_->reset();
- return common::E_OK;
- }
- virtual Statistic *get_statistic() const { return statistic_; }
- common::TsID get_ts_id() const { return ts_id_; }
- void set_ts_id(const common::TsID &ts_id) {
- ts_id_ = ts_id;
-
- // TODO for debug only
- if (chunk_meta_list_ != nullptr) {
- common::SimpleList<ChunkMeta *>::Iterator it =
- chunk_meta_list_->begin();
- for (; it != chunk_meta_list_->end(); it++) {
- it.get()->ts_id_ = ts_id;
- }
- }
- }
-
- FORCE_INLINE void finish() {
- chunk_meta_list_data_size_ =
- chunk_meta_list_serialized_buf_.total_size();
- }
-
- int serialize_to(common::ByteStream &out) {
- int ret = common::E_OK;
- if (RET_FAIL(common::SerializationUtil::write_char(
- timeseries_meta_type_, out))) {
- } else if (RET_FAIL(common::SerializationUtil::write_mystring(
- measurement_name_, out))) {
- } else if (RET_FAIL(common::SerializationUtil::write_char(data_type_,
- out))) {
- } else if (RET_FAIL(common::SerializationUtil::write_var_uint(
- chunk_meta_list_data_size_, out))) {
- } else if (RET_FAIL(statistic_->serialize_to(out))) {
- } else if (RET_FAIL(merge_byte_stream(
- out, chunk_meta_list_serialized_buf_))) {
- }
- return ret;
- }
-
- int deserialize_from(common::ByteStream &in, common::PageArena *pa) {
- int ret = common::E_OK;
- if (RET_FAIL(common::SerializationUtil::read_char(timeseries_meta_type_,
- in))) {
- } else if (RET_FAIL(common::SerializationUtil::read_mystring(
- measurement_name_, pa, in))) {
- } else if (RET_FAIL(common::SerializationUtil::read_char(
- (char &) data_type_, in))) {
- } else if (RET_FAIL(common::SerializationUtil::read_var_uint(
- chunk_meta_list_data_size_, in))) {
- } else if (nullptr ==
- (statistic_ = StatisticFactory::alloc_statistic_with_pa(
- data_type_, pa))) {
- ret = common::E_OOM;
- } else if (RET_FAIL(statistic_->deserialize_from(in))) {
- } else {
- statistic_from_pa_ = true;
- void *chunk_meta_list_buf = pa->alloc(sizeof(*chunk_meta_list_));
- if (IS_NULL(chunk_meta_list_buf)) {
- return common::E_OOM;
- }
- const bool deserialize_chunk_meta_statistic =
- (timeseries_meta_type_ & 0x3F); // TODO
- chunk_meta_list_ =
- new(chunk_meta_list_buf) common::SimpleList<ChunkMeta *>(pa);
- uint32_t start_pos = in.read_pos();
- while (IS_SUCC(ret) &&
- in.read_pos() < start_pos + chunk_meta_list_data_size_) {
- void *cm_buf = pa->alloc(sizeof(ChunkMeta));
- if (IS_NULL(cm_buf)) {
- ret = common::E_OOM;
- } else {
- ChunkMeta *cm = new(cm_buf) ChunkMeta;
- cm->measurement_name_.shallow_copy_from(
- this->measurement_name_);
- cm->data_type_ = this->data_type_;
- cm->mask_ = 0; // TODO
- if (RET_FAIL(cm->deserialize_from(
- in, deserialize_chunk_meta_statistic, pa))) {
- } else if (RET_FAIL(chunk_meta_list_->push_back(cm))) {
- }
+ public:
+ static const uint32_t CHUNK_META_LIST_SERIALIZED_BUF_PAGE_SIZE = 128;
+ static const uint32_t PAGE_ARENA_PAGE_SIZE = 256;
+ static const common::AllocModID PAGE_ARENA_MOD_ID =
+ common::MOD_TIMESERIES_INDEX_OBJ;
+
+ public:
+ TimeseriesIndex()
+ : timeseries_meta_type_((char)255),
+ chunk_meta_list_data_size_(0),
+ measurement_name_(),
+ ts_id_(),
+ data_type_(common::INVALID_DATATYPE),
+ statistic_(nullptr),
+ statistic_from_pa_(false),
+ chunk_meta_list_serialized_buf_(
+ CHUNK_META_LIST_SERIALIZED_BUF_PAGE_SIZE, PAGE_ARENA_MOD_ID),
+ chunk_meta_list_(nullptr) {
+ // page_arena_.init(PAGE_ARENA_PAGE_SIZE, PAGE_ARENA_MOD_ID);
+ }
+ ~TimeseriesIndex() { destroy(); }
+ void destroy() {
+ // page_arena_.destroy();
+ reset();
+ }
+ void reset() // FIXME reuse
+ {
+ timeseries_meta_type_ = 0;
+ chunk_meta_list_data_size_ = 0;
+ measurement_name_.reset();
+ ts_id_.reset();
+ data_type_ = common::VECTOR;
+ chunk_meta_list_serialized_buf_.reset();
+ if (statistic_ != nullptr && !statistic_from_pa_) {
+ StatisticFactory::free(statistic_);
+ statistic_ = nullptr;
}
- }
- }
- return ret;
- }
-
- int clone_from(const TimeseriesIndex &that, common::PageArena *pa) {
- int ret = common::E_OK;
- timeseries_meta_type_ = that.timeseries_meta_type_;
- chunk_meta_list_data_size_ = that.chunk_meta_list_data_size_;
- ts_id_ = that.ts_id_;
- data_type_ = that.data_type_;
-
- statistic_ = StatisticFactory::alloc_statistic_with_pa(data_type_, pa);
- if (IS_NULL(statistic_)) {
- return common::E_OOM;
- }
- clone_statistic(that.statistic_, this->statistic_, data_type_);
- statistic_from_pa_ = true;
-
- if (RET_FAIL(measurement_name_.dup_from(that.measurement_name_, *pa))) {
- return ret;
- }
-
- if (that.chunk_meta_list_ != nullptr) {
- void *buf = pa->alloc(sizeof(*chunk_meta_list_));
- if (IS_NULL(buf)) {
- return common::E_OOM;
- }
- chunk_meta_list_ = new(buf) common::SimpleList<ChunkMeta *>(pa);
- common::SimpleList<ChunkMeta *>::Iterator it;
- for (it = that.chunk_meta_list_->begin();
- IS_SUCC(ret) && it != that.chunk_meta_list_->end(); it++) {
- ChunkMeta *cm = it.get();
- void *cm_buf = pa->alloc(sizeof(ChunkMeta));
- if (IS_NULL(cm_buf)) {
- return common::E_OOM;
+ }
+
+ int add_chunk_meta(ChunkMeta *chunk_meta, bool serialize_statistic);
+ FORCE_INLINE int set_measurement_name(common::String &measurement_name,
+ common::PageArena &pa) {
+ return measurement_name_.dup_from(measurement_name, pa);
+ }
+ FORCE_INLINE void set_measurement_name(common::String &measurement_name) {
+ measurement_name_.shallow_copy_from(measurement_name);
+ }
+ FORCE_INLINE virtual common::String get_measurement_name() const {
+ return measurement_name_;
+ }
+ virtual inline common::SimpleList<ChunkMeta *> *get_chunk_meta_list()
+ const {
+ return chunk_meta_list_;
+ }
+ FORCE_INLINE void set_ts_meta_type(char ts_meta_type) {
+ timeseries_meta_type_ = ts_meta_type;
+ }
+ FORCE_INLINE void set_data_type(common::TSDataType data_type) {
+ data_type_ = data_type;
+ }
+ FORCE_INLINE virtual common::TSDataType get_data_type() const {
+ return data_type_;
+ }
+ int init_statistic(common::TSDataType data_type) {
+ if (statistic_ != nullptr &&
+ !statistic_from_pa_) { // clear old statistic
+ StatisticFactory::free(statistic_);
+ statistic_ = nullptr;
+ }
+ statistic_ = StatisticFactory::alloc_statistic(data_type);
+ if (IS_NULL(statistic_)) {
+ return common::E_OOM;
+ }
+ statistic_->reset();
+ return common::E_OK;
+ }
+ virtual Statistic *get_statistic() const { return statistic_; }
+ common::TsID get_ts_id() const { return ts_id_; }
+ void set_ts_id(const common::TsID &ts_id) {
+ ts_id_ = ts_id;
+
+ // TODO for debug only
+ if (chunk_meta_list_ != nullptr) {
+ common::SimpleList<ChunkMeta *>::Iterator it =
+ chunk_meta_list_->begin();
+ for (; it != chunk_meta_list_->end(); it++) {
+ it.get()->ts_id_ = ts_id;
+ }
+ }
+ }
+
+ FORCE_INLINE void finish() {
+ chunk_meta_list_data_size_ =
+ chunk_meta_list_serialized_buf_.total_size();
+ }
+
+ int serialize_to(common::ByteStream &out) {
+ int ret = common::E_OK;
+ if (RET_FAIL(common::SerializationUtil::write_char(
+ timeseries_meta_type_, out))) {
+ } else if (RET_FAIL(common::SerializationUtil::write_mystring(
+ measurement_name_, out))) {
+ } else if (RET_FAIL(common::SerializationUtil::write_char(data_type_,
+ out))) {
+ } else if (RET_FAIL(common::SerializationUtil::write_var_uint(
+ chunk_meta_list_data_size_, out))) {
+ } else if (RET_FAIL(statistic_->serialize_to(out))) {
+ } else if (RET_FAIL(merge_byte_stream(
+ out, chunk_meta_list_serialized_buf_))) {
+ }
+ return ret;
+ }
+
+ int deserialize_from(common::ByteStream &in, common::PageArena *pa) {
+ int ret = common::E_OK;
+ if
(RET_FAIL(common::SerializationUtil::read_char(timeseries_meta_type_,
+ in))) {
+ } else if (RET_FAIL(common::SerializationUtil::read_mystring(
+ measurement_name_, pa, in))) {
+ } else if (RET_FAIL(common::SerializationUtil::read_char(
+ (char &)data_type_, in))) {
+ } else if (RET_FAIL(common::SerializationUtil::read_var_uint(
+ chunk_meta_list_data_size_, in))) {
+ } else if (nullptr ==
+ (statistic_ = StatisticFactory::alloc_statistic_with_pa(
+ data_type_, pa))) {
+ ret = common::E_OOM;
+ } else if (RET_FAIL(statistic_->deserialize_from(in))) {
} else {
- ChunkMeta *my_cm = new(cm_buf) ChunkMeta;
- if (RET_FAIL(my_cm->clone_from(*cm, pa))) {
- } else if (RET_FAIL(chunk_meta_list_->push_back(my_cm))) {
- }
+ statistic_from_pa_ = true;
+ void *chunk_meta_list_buf = pa->alloc(sizeof(*chunk_meta_list_));
+ if (IS_NULL(chunk_meta_list_buf)) {
+ return common::E_OOM;
+ }
+ const bool deserialize_chunk_meta_statistic =
+ (timeseries_meta_type_ & 0x3F); // TODO
+ chunk_meta_list_ =
+ new (chunk_meta_list_buf) common::SimpleList<ChunkMeta *>(pa);
+ uint32_t start_pos = in.read_pos();
+ while (IS_SUCC(ret) &&
+ in.read_pos() < start_pos + chunk_meta_list_data_size_) {
+ void *cm_buf = pa->alloc(sizeof(ChunkMeta));
+ if (IS_NULL(cm_buf)) {
+ ret = common::E_OOM;
+ } else {
+ ChunkMeta *cm = new (cm_buf) ChunkMeta;
+ cm->measurement_name_.shallow_copy_from(
+ this->measurement_name_);
+ cm->data_type_ = this->data_type_;
+ cm->mask_ = 0; // TODO
+ if (RET_FAIL(cm->deserialize_from(
+ in, deserialize_chunk_meta_statistic, pa))) {
+ } else if (RET_FAIL(chunk_meta_list_->push_back(cm))) {
+ }
+ }
+ }
+ }
+ return ret;
+ }
+
+ int clone_from(const TimeseriesIndex &that, common::PageArena *pa) {
+ int ret = common::E_OK;
+ timeseries_meta_type_ = that.timeseries_meta_type_;
+ chunk_meta_list_data_size_ = that.chunk_meta_list_data_size_;
+ ts_id_ = that.ts_id_;
+ data_type_ = that.data_type_;
+
+ statistic_ = StatisticFactory::alloc_statistic_with_pa(data_type_, pa);
+ if (IS_NULL(statistic_)) {
+ return common::E_OOM;
}
- }
- } // end (that.chunk_meta_list_ != nullptr)
- return ret;
- }
+ clone_statistic(that.statistic_, this->statistic_, data_type_);
+ statistic_from_pa_ = true;
+
+ if (RET_FAIL(measurement_name_.dup_from(that.measurement_name_, *pa)))
{
+ return ret;
+ }
+
+ if (that.chunk_meta_list_ != nullptr) {
+ void *buf = pa->alloc(sizeof(*chunk_meta_list_));
+ if (IS_NULL(buf)) {
+ return common::E_OOM;
+ }
+ chunk_meta_list_ = new (buf) common::SimpleList<ChunkMeta *>(pa);
+ common::SimpleList<ChunkMeta *>::Iterator it;
+ for (it = that.chunk_meta_list_->begin();
+ IS_SUCC(ret) && it != that.chunk_meta_list_->end(); it++) {
+ ChunkMeta *cm = it.get();
+ void *cm_buf = pa->alloc(sizeof(ChunkMeta));
+ if (IS_NULL(cm_buf)) {
+ return common::E_OOM;
+ } else {
+ ChunkMeta *my_cm = new (cm_buf) ChunkMeta;
+ if (RET_FAIL(my_cm->clone_from(*cm, pa))) {
+ } else if (RET_FAIL(chunk_meta_list_->push_back(my_cm))) {
+ }
+ }
+ }
+ } // end (that.chunk_meta_list_ != nullptr)
+ return ret;
+ }
#ifndef NDEBUG
- friend std::ostream &operator<<(std::ostream &os,
- const TimeseriesIndex &tsi) {
- os << "{meta_type=" << (int) tsi.timeseries_meta_type_
- << ", chunk_meta_list_data_size=" << tsi.chunk_meta_list_data_size_
- << ", measurement_name=" << tsi.measurement_name_
- << ", ts_id=" << tsi.ts_id_.to_string()
- << ", data_type=" << common::get_data_type_name(tsi.data_type_)
- << ", statistic=" << tsi.statistic_->to_string();
-
- if (tsi.chunk_meta_list_) {
- os << ", chunk_meta_list={";
- int count = 0;
- common::SimpleList<ChunkMeta *>::Iterator it =
- tsi.chunk_meta_list_->begin();
- for (; it != tsi.chunk_meta_list_->end(); it++, count++) {
- if (count != 0) {
- os << ", ";
+ friend std::ostream &operator<<(std::ostream &os,
+ const TimeseriesIndex &tsi) {
+ os << "{meta_type=" << (int)tsi.timeseries_meta_type_
+ << ", chunk_meta_list_data_size=" << tsi.chunk_meta_list_data_size_
+ << ", measurement_name=" << tsi.measurement_name_
+ << ", ts_id=" << tsi.ts_id_.to_string()
+ << ", data_type=" << common::get_data_type_name(tsi.data_type_)
+ << ", statistic=" << tsi.statistic_->to_string();
+
+ if (tsi.chunk_meta_list_) {
+ os << ", chunk_meta_list={";
+ int count = 0;
+ common::SimpleList<ChunkMeta *>::Iterator it =
+ tsi.chunk_meta_list_->begin();
+ for (; it != tsi.chunk_meta_list_->end(); it++, count++) {
+ if (count != 0) {
+ os << ", ";
+ }
+ os << "[" << count << "]={" << *it.get() << "}";
+ }
+ os << "}";
}
- os << "[" << count << "]={" << *it.get() << "}";
- }
- os << "}";
+ return os;
}
- return os;
- }
#endif
-private:
- /*
- * If this timeseries has more than one chunk meta, timeseries_meta_type_
- * is 1. Otherwise timeseries_meta_type_ is 0. It also should OR with mask
- * of chunk meta.
- */
- char timeseries_meta_type_;
-
- // Sum of chunk meta serialized size in List<ChunkMeta> of this timeseries.
- uint32_t chunk_meta_list_data_size_;
-
- // std::string measurement_name_;
- common::String measurement_name_;
- common::TsID ts_id_;
- common::TSDataType data_type_;
-
- /*
- * If TimeseriesIndex has only one ChunkMeta, then
- * TimeseriesIndex.statistic_ is duplicated with ChunkMeta.statistic_. In
- * this case, we do not serialize ChunkMeta.statistic_.
- */
- Statistic *statistic_;
- bool statistic_from_pa_;
- common::ByteStream chunk_meta_list_serialized_buf_;
- // common::PageArena page_arena_;
- common::SimpleList<ChunkMeta *> *chunk_meta_list_; // for deserialize_from
+ private:
+ /*
+ * If this timeseries has more than one chunk meta, timeseries_meta_type_
+ * is 1. Otherwise timeseries_meta_type_ is 0. It also should OR with mask
+ * of chunk meta.
+ */
+ char timeseries_meta_type_;
+
+ // Sum of chunk meta serialized size in List<ChunkMeta> of this timeseries.
+ uint32_t chunk_meta_list_data_size_;
+
+ // std::string measurement_name_;
+ common::String measurement_name_;
+ common::TsID ts_id_;
+ common::TSDataType data_type_;
+
+ /*
+ * If TimeseriesIndex has only one ChunkMeta, then
+ * TimeseriesIndex.statistic_ is duplicated with ChunkMeta.statistic_. In
+ * this case, we do not serialize ChunkMeta.statistic_.
+ */
+ Statistic *statistic_;
+ bool statistic_from_pa_;
+ common::ByteStream chunk_meta_list_serialized_buf_;
+ // common::PageArena page_arena_;
+ common::SimpleList<ChunkMeta *> *chunk_meta_list_; // for deserialize_from
};
class AlignedTimeseriesIndex : public ITimeseriesIndex {
-public:
- TimeseriesIndex *time_ts_idx_;
- TimeseriesIndex *value_ts_idx_;
-
- AlignedTimeseriesIndex() {}
- ~AlignedTimeseriesIndex() {}
- virtual common::SimpleList<ChunkMeta *> *get_time_chunk_meta_list() const {
- return time_ts_idx_->get_chunk_meta_list();
- }
- virtual common::SimpleList<ChunkMeta *> *get_value_chunk_meta_list() const {
- return value_ts_idx_->get_chunk_meta_list();
- }
-
- virtual common::String get_measurement_name() {
- return value_ts_idx_->get_measurement_name();
- }
- virtual common::TSDataType get_data_type() const {
- return time_ts_idx_->get_data_type();
- }
- virtual Statistic *get_statistic() const {
- return value_ts_idx_->get_statistic();
- }
+ public:
+ TimeseriesIndex *time_ts_idx_;
+ TimeseriesIndex *value_ts_idx_;
+
+ AlignedTimeseriesIndex() {}
+ ~AlignedTimeseriesIndex() {}
+ virtual common::SimpleList<ChunkMeta *> *get_time_chunk_meta_list() const {
+ return time_ts_idx_->get_chunk_meta_list();
+ }
+ virtual common::SimpleList<ChunkMeta *> *get_value_chunk_meta_list() const
{
+ return value_ts_idx_->get_chunk_meta_list();
+ }
+
+ virtual common::String get_measurement_name() const {
+ return value_ts_idx_->get_measurement_name();
+ }
+ virtual common::TSDataType get_data_type() const {
+ return time_ts_idx_->get_data_type();
+ }
+ virtual Statistic *get_statistic() const {
+ return value_ts_idx_->get_statistic();
+ }
#ifndef NDEBUG
- friend std::ostream &operator<<(std::ostream &os,
- const AlignedTimeseriesIndex &tsi) {
- os << "time_ts_idx=" << *tsi.time_ts_idx_;
- os << ", value_ts_idx=" << *tsi.value_ts_idx_;
- return os;
- }
+ friend std::ostream &operator<<(std::ostream &os,
+ const AlignedTimeseriesIndex &tsi) {
+ os << "time_ts_idx=" << *tsi.time_ts_idx_;
+ os << ", value_ts_idx=" << *tsi.value_ts_idx_;
+ return os;
+ }
#endif
};
class TSMIterator {
-public:
- explicit TSMIterator(
- common::SimpleList<ChunkGroupMeta *> &chunk_group_meta_list)
- : chunk_group_meta_list_(chunk_group_meta_list),
- chunk_group_meta_iter_(),
- chunk_meta_iter_() {}
-
- // sort => iterate
- int init();
- bool has_next() const;
- int get_next(std::shared_ptr<IDeviceID> &ret_device_id,
- common::String &ret_measurement_name,
- TimeseriesIndex &ret_ts_index);
+ public:
+ explicit TSMIterator(
+ common::SimpleList<ChunkGroupMeta *> &chunk_group_meta_list)
+ : chunk_group_meta_list_(chunk_group_meta_list),
+ chunk_group_meta_iter_(),
+ chunk_meta_iter_() {}
+
+ // sort => iterate
+ int init();
+ bool has_next() const;
+ int get_next(std::shared_ptr<IDeviceID> &ret_device_id,
+ common::String &ret_measurement_name,
+ TimeseriesIndex &ret_ts_index);
private:
common::SimpleList<ChunkGroupMeta *> &chunk_group_meta_list_;
common::SimpleList<ChunkGroupMeta *>::Iterator chunk_group_meta_iter_;
common::SimpleList<ChunkMeta *>::Iterator chunk_meta_iter_;
- // timeseries measurenemnt chunk meta info
- // map <device_name, <measurement_name, vector<chunk_meta>>>
- std::map<std::shared_ptr<IDeviceID>,
- std::map<common::String, std::vector<ChunkMeta *>>>
- tsm_chunk_meta_info_;
+ // timeseries measurenemnt chunk meta info
+ // map <device_name, <measurement_name, vector<chunk_meta>>>
+ std::map<std::shared_ptr<IDeviceID>,
+ std::map<common::String, std::vector<ChunkMeta *>>>
+ tsm_chunk_meta_info_;
- // device iterator
- std::map<std::shared_ptr<IDeviceID>,
- std::map<common::String, std::vector<ChunkMeta *>>>::iterator
- tsm_device_iter_;
+ // device iterator
+ std::map<std::shared_ptr<IDeviceID>,
+ std::map<common::String, std::vector<ChunkMeta *>>>::iterator
+ tsm_device_iter_;
- // measurement iterator
- std::map<common::String, std::vector<ChunkMeta *>>::iterator
- tsm_measurement_iter_;
+ // measurement iterator
+ std::map<common::String, std::vector<ChunkMeta *>>::iterator
+ tsm_measurement_iter_;
};
/* =============== TsFile Index ================ */
struct IComparable {
- virtual ~IComparable() = default;
- virtual bool operator<(const IComparable &other) const = 0;
- virtual bool operator>(const IComparable &other) const = 0;
- virtual bool operator==(const IComparable &other) const = 0;
- virtual int compare(const IComparable &other) {
- if (this->operator<(other)) {
- return -1;
- } else if (this->operator>(other)) {
- return 1;
- } else {
- return 0;
- }
- }
- virtual std::string to_string() const = 0;
+ virtual ~IComparable() = default;
+ virtual bool operator<(const IComparable &other) const = 0;
+ virtual bool operator>(const IComparable &other) const = 0;
+ virtual bool operator==(const IComparable &other) const = 0;
+ virtual int compare(const IComparable &other) {
+ if (this->operator<(other)) {
+ return -1;
+ } else if (this->operator>(other)) {
+ return 1;
+ } else {
+ return 0;
+ }
+ }
+ virtual std::string to_string() const = 0;
};
struct DeviceIDComparable : IComparable {
- std::shared_ptr<IDeviceID> device_id_;
-
- explicit DeviceIDComparable(const std::shared_ptr<IDeviceID> &device_id)
- : device_id_(device_id) {}
-
- bool operator<(const IComparable &other) const override {
- const auto *other_device =
- dynamic_cast<const DeviceIDComparable *>(&other);
- if (!other_device)
- throw std::runtime_error("Incompatible comparison");
- return device_id_->get_device_name() <
- other_device->device_id_->get_device_name();
- }
-
- bool operator>(const IComparable &other) const override {
- const auto *other_device =
- dynamic_cast<const DeviceIDComparable *>(&other);
- if (!other_device)
- throw std::runtime_error("Incompatible comparison");
- return device_id_->get_device_name() >
- other_device->device_id_->get_device_name();
- }
-
- bool operator==(const IComparable &other) const override {
- const auto *other_device =
- dynamic_cast<const DeviceIDComparable *>(&other);
- if (!other_device)
- throw std::runtime_error("Incompatible comparison");
- return device_id_->get_device_name() ==
- other_device->device_id_->get_device_name();
- }
-
- std::string to_string() const override {
- return device_id_->get_device_name();
- }
+ std::shared_ptr<IDeviceID> device_id_;
+
+ explicit DeviceIDComparable(const std::shared_ptr<IDeviceID> &device_id)
+ : device_id_(device_id) {}
+
+ bool operator<(const IComparable &other) const override {
+ const auto *other_device =
+ dynamic_cast<const DeviceIDComparable *>(&other);
+ if (!other_device) throw std::runtime_error("Incompatible comparison");
+ return device_id_->get_device_name() <
+ other_device->device_id_->get_device_name();
+ }
+
+ bool operator>(const IComparable &other) const override {
+ const auto *other_device =
+ dynamic_cast<const DeviceIDComparable *>(&other);
+ if (!other_device) throw std::runtime_error("Incompatible comparison");
+ return device_id_->get_device_name() >
+ other_device->device_id_->get_device_name();
+ }
+
+ bool operator==(const IComparable &other) const override {
+ const auto *other_device =
+ dynamic_cast<const DeviceIDComparable *>(&other);
+ if (!other_device) throw std::runtime_error("Incompatible comparison");
+ return device_id_->get_device_name() ==
+ other_device->device_id_->get_device_name();
+ }
+
+ std::string to_string() const override {
+ return device_id_->get_device_name();
+ }
};
struct StringComparable : IComparable {
- std::string value_;
-
- explicit StringComparable(const std::string &value) : value_(value) {}
-
- bool operator<(const IComparable &other) const override {
- const auto *other_string =
- dynamic_cast<const StringComparable *>(&other);
- if (!other_string)
- throw std::runtime_error("Incompatible comparison");
- return value_ < other_string->value_;
- }
-
- bool operator>(const IComparable &other) const override {
- const auto *other_string =
- dynamic_cast<const StringComparable *>(&other);
- if (!other_string)
- throw std::runtime_error("Incompatible comparison");
- return value_ > other_string->value_;
- }
-
- bool operator==(const IComparable &other) const override {
- const auto *other_string =
- dynamic_cast<const StringComparable *>(&other);
- if (!other_string)
- throw std::runtime_error("Incompatible comparison");
- return value_ == other_string->value_;
- }
-
- std::string to_string() const override { return value_; }
+ std::string value_;
+
+ explicit StringComparable(const std::string &value) : value_(value) {}
+
+ bool operator<(const IComparable &other) const override {
+ const auto *other_string =
+ dynamic_cast<const StringComparable *>(&other);
+ if (!other_string) throw std::runtime_error("Incompatible comparison");
+ return value_ < other_string->value_;
+ }
+
+ bool operator>(const IComparable &other) const override {
+ const auto *other_string =
+ dynamic_cast<const StringComparable *>(&other);
+ if (!other_string) throw std::runtime_error("Incompatible comparison");
+ return value_ > other_string->value_;
+ }
+
+ bool operator==(const IComparable &other) const override {
+ const auto *other_string =
+ dynamic_cast<const StringComparable *>(&other);
+ if (!other_string) throw std::runtime_error("Incompatible comparison");
+ return value_ == other_string->value_;
+ }
+
+ std::string to_string() const override { return value_; }
};
struct IMetaIndexEntry {
- static void self_destructor(IMetaIndexEntry *ptr) {
- if (ptr) {
- ptr->~IMetaIndexEntry();
- }
- }
- IMetaIndexEntry() = default;
- virtual ~IMetaIndexEntry() = default;
-
- virtual int serialize_to(common::ByteStream &out) { return common::E_OK; }
- virtual int deserialize_from(common::ByteStream &out,
- common::PageArena *pa) {
- return common::E_OK;
- }
- virtual int64_t get_offset() const { return 0; }
- virtual bool is_device_level() const { return false; }
- virtual std::shared_ptr<IComparable> get_compare_key() const {
- return std::shared_ptr<IComparable>();
- }
- virtual common::String get_name() const { return {}; }
- virtual std::shared_ptr<IDeviceID> get_device_id() const { return nullptr; }
- virtual void clone(std::shared_ptr<IMetaIndexEntry> entry, common::PageArena
*pa) {}
+ static void self_destructor(IMetaIndexEntry *ptr) {
+ if (ptr) {
+ ptr->~IMetaIndexEntry();
+ }
+ }
+ IMetaIndexEntry() = default;
+ virtual ~IMetaIndexEntry() = default;
+
+ virtual int serialize_to(common::ByteStream &out) { return common::E_OK; }
+ virtual int deserialize_from(common::ByteStream &out,
+ common::PageArena *pa) {
+ return common::E_OK;
+ }
Review Comment:
For these virtual functions return not_support will be better.
##########
cpp/src/common/schema.h:
##########
@@ -20,319 +20,342 @@
#ifndef COMMON_SCHEMA_H
#define COMMON_SCHEMA_H
+#include <algorithm>
#include <map> // use unordered_map instead
#include <memory>
#include <string>
+#include <unordered_map>
#include "common/db_common.h"
#include "writer/time_chunk_writer.h"
#include "writer/value_chunk_writer.h"
namespace storage {
- class ChunkWriter;
-}
+class ChunkWriter;
+class ValueChunkWriter;
+class TimeChunkWriter;
+} // namespace storage
namespace storage {
- /* schema information for one measurement */
- struct MeasurementSchema {
- std::string measurement_name_; // for example: "s1"
- common::TSDataType data_type_;
- common::TSEncoding encoding_;
- common::CompressionType compression_type_;
- storage::ChunkWriter *chunk_writer_;
- ValueChunkWriter *value_chunk_writer_;
- std::map<std::string, std::string> props_;
-
- MeasurementSchema()
- : measurement_name_(),
- data_type_(common::INVALID_DATATYPE),
- encoding_(common::INVALID_ENCODING),
- compression_type_(common::INVALID_COMPRESSION),
- chunk_writer_(nullptr),
- value_chunk_writer_(nullptr) {
- }
-
- MeasurementSchema(const std::string &measurement_name,
- common::TSDataType data_type)
- : measurement_name_(measurement_name),
- data_type_(data_type),
- encoding_(get_default_encoding_for_type(data_type)),
- compression_type_(common::UNCOMPRESSED),
- chunk_writer_(nullptr),
- value_chunk_writer_(nullptr) {
- }
-
- MeasurementSchema(const std::string &measurement_name,
- common::TSDataType data_type, common::TSEncoding
encoding,
- common::CompressionType compression_type)
- : measurement_name_(measurement_name),
- data_type_(data_type),
- encoding_(encoding),
- compression_type_(compression_type),
- chunk_writer_(nullptr),
- value_chunk_writer_(nullptr) {
- }
-
- int serialize_to(common::ByteStream &out) {
- int ret = common::E_OK;
- if (RET_FAIL(
+/* schema information for one measurement */
+struct MeasurementSchema {
+ std::string measurement_name_; // for example: "s1"
+ common::TSDataType data_type_;
+ common::TSEncoding encoding_;
+ common::CompressionType compression_type_;
+ storage::ChunkWriter *chunk_writer_;
+ ValueChunkWriter *value_chunk_writer_;
+ std::map<std::string, std::string> props_;
+
+ MeasurementSchema()
+ : measurement_name_(),
+ data_type_(common::INVALID_DATATYPE),
+ encoding_(common::INVALID_ENCODING),
+ compression_type_(common::INVALID_COMPRESSION),
+ chunk_writer_(nullptr),
+ value_chunk_writer_(nullptr) {}
+
+ MeasurementSchema(const std::string &measurement_name,
+ common::TSDataType data_type)
+ : measurement_name_(measurement_name),
+ data_type_(data_type),
+ encoding_(get_default_encoding_for_type(data_type)),
+ compression_type_(common::UNCOMPRESSED),
+ chunk_writer_(nullptr),
+ value_chunk_writer_(nullptr) {}
+
+ MeasurementSchema(const std::string &measurement_name,
+ common::TSDataType data_type, common::TSEncoding
encoding,
+ common::CompressionType compression_type)
+ : measurement_name_(measurement_name),
+ data_type_(data_type),
+ encoding_(encoding),
+ compression_type_(compression_type),
+ chunk_writer_(nullptr),
+ value_chunk_writer_(nullptr) {}
+
+ int serialize_to(common::ByteStream &out) {
+ int ret = common::E_OK;
+ if (RET_FAIL(
common::SerializationUtil::write_str(measurement_name_, out)))
{
- } else if (RET_FAIL(
- common::SerializationUtil::write_ui8(data_type_, out))) {
- } else if (RET_FAIL(
- common::SerializationUtil::write_ui8(encoding_, out))) {
- } else if (RET_FAIL(common::SerializationUtil::write_ui8(
- compression_type_, out))) {
- }
- if (ret == common::E_OK) {
- if
(RET_FAIL(common::SerializationUtil::write_ui32(props_.size(),
- out))) {
- for (const auto &prop: props_) {
- if (RET_FAIL(common::SerializationUtil::write_str(
+ } else if (RET_FAIL(
+ common::SerializationUtil::write_ui8(data_type_, out)))
{
+ } else if (RET_FAIL(
+ common::SerializationUtil::write_ui8(encoding_, out))) {
+ } else if (RET_FAIL(common::SerializationUtil::write_ui8(
+ compression_type_, out))) {
+ }
+ if (ret == common::E_OK) {
+ if (RET_FAIL(common::SerializationUtil::write_ui32(props_.size(),
+ out))) {
+ for (const auto &prop : props_) {
+ if (RET_FAIL(common::SerializationUtil::write_str(
prop.first, out))) {
- } else if
(RET_FAIL(common::SerializationUtil::write_str(
- prop.second, out))) {
- }
- if (IS_FAIL(ret)) break;
+ } else if (RET_FAIL(common::SerializationUtil::write_str(
+ prop.second, out))) {
}
+ if (IS_FAIL(ret)) break;
}
}
- return ret;
}
+ return ret;
+ }
- int deserialize_from(common::ByteStream &in) {
- int ret = common::E_OK;
- uint8_t data_type = common::TSDataType::INVALID_DATATYPE,
+ int deserialize_from(common::ByteStream &in) {
+ int ret = common::E_OK;
+ uint8_t data_type = common::TSDataType::INVALID_DATATYPE,
encoding = common::TSEncoding::INVALID_ENCODING,
compression_type =
common::CompressionType::INVALID_COMPRESSION;
- if (RET_FAIL(
+ if (RET_FAIL(
common::SerializationUtil::read_str(measurement_name_, in))) {
- } else if (RET_FAIL(
- common::SerializationUtil::read_ui8(data_type, in))) {
- } else if (RET_FAIL(
- common::SerializationUtil::read_ui8(encoding, in))) {
- } else if (RET_FAIL(common::SerializationUtil::read_ui8(
- compression_type, in))) {
- }
- data_type_ = static_cast<common::TSDataType>(data_type);
- encoding_ = static_cast<common::TSEncoding>(encoding);
- compression_type_ =
static_cast<common::CompressionType>(compression_type);
- uint32_t props_size;
- if (ret == common::E_OK) {
- if (RET_FAIL(common::SerializationUtil::read_ui32(props_size,
- in))) {
- for (uint32_t i = 0; i < props_.size(); ++i) {
- std::string key, value;
- if (RET_FAIL(common::SerializationUtil::read_str(
- key, in))) {
- } else if
(RET_FAIL(common::SerializationUtil::read_str(
- value, in))) {
- }
- props_.insert(std::make_pair(key, value));
- if (IS_FAIL(ret)) break;
+ } else if (RET_FAIL(
+ common::SerializationUtil::read_ui8(data_type, in))) {
+ } else if (RET_FAIL(
+ common::SerializationUtil::read_ui8(encoding, in))) {
+ } else if (RET_FAIL(common::SerializationUtil::read_ui8(
+ compression_type, in))) {
+ }
+ data_type_ = static_cast<common::TSDataType>(data_type);
+ encoding_ = static_cast<common::TSEncoding>(encoding);
+ compression_type_ =
+ static_cast<common::CompressionType>(compression_type);
+ uint32_t props_size;
+ if (ret == common::E_OK) {
+ if (RET_FAIL(
+ common::SerializationUtil::read_ui32(props_size, in))) {
+ for (uint32_t i = 0; i < props_.size(); ++i) {
+ std::string key, value;
+ if (RET_FAIL(
+ common::SerializationUtil::read_str(key, in))) {
+ } else if (RET_FAIL(common::SerializationUtil::read_str(
+ value, in))) {
}
+ props_.insert(std::make_pair(key, value));
+ if (IS_FAIL(ret)) break;
}
}
- return ret;
}
- };
+ return ret;
+ }
+};
- typedef std::map<std::string, MeasurementSchema *> MeasurementSchemaMap;
- typedef std::map<std::string, MeasurementSchema *>::iterator
+typedef std::map<std::string, MeasurementSchema *> MeasurementSchemaMap;
+typedef std::map<std::string, MeasurementSchema *>::iterator
MeasurementSchemaMapIter;
- typedef std::pair<MeasurementSchemaMapIter, bool>
+typedef std::pair<MeasurementSchemaMapIter, bool>
MeasurementSchemaMapInsertResult;
- /* schema information for a device */
- struct MeasurementSchemaGroup {
- // measurement_name -> MeasurementSchema
- MeasurementSchemaMap measurement_schema_map_;
- bool is_aligned_ = false;
- TimeChunkWriter *time_chunk_writer_ = nullptr;
- };
-
- enum class ColumnCategory { TAG = 0, FIELD = 1 };
-
- class TableSchema {
- public:
- static void to_lowercase_inplace(std::string &str) {
- std::transform(str.begin(), str.end(), str.begin(),
- [](unsigned char c) -> unsigned char { return
std::tolower(c); });
- }
-
- TableSchema() = default;
-
- TableSchema(const std::string &table_name,
- const std::vector<MeasurementSchema*>
- &column_schemas,
- const std::vector<ColumnCategory> &column_categories)
- : table_name_(table_name),
- column_categories_(column_categories) {
- to_lowercase_inplace(table_name_);
- for (const auto column_schema : column_schemas) {
- if (column_schema != nullptr) {
-
column_schemas_.emplace_back(std::shared_ptr<MeasurementSchema>(column_schema));
- }
- }
- int idx = 0;
- for (const auto &measurement_schema: column_schemas_) {
- to_lowercase_inplace(measurement_schema->measurement_name_);
- column_pos_index_.insert(
- std::make_pair(measurement_schema->measurement_name_,
idx++));
+/* schema information for a device */
+struct MeasurementSchemaGroup {
+ // measurement_name -> MeasurementSchema
+ MeasurementSchemaMap measurement_schema_map_;
+ bool is_aligned_ = false;
+ TimeChunkWriter *time_chunk_writer_ = nullptr;
+};
+
+enum class ColumnCategory { TAG = 0, FIELD = 1 };
+
+class TableSchema {
+ public:
+ static void to_lowercase_inplace(std::string &str) {
+ std::transform(
+ str.begin(), str.end(), str.begin(),
+ [](unsigned char c) -> unsigned char { return std::tolower(c); });
+ }
+
+ TableSchema() = default;
+
+ TableSchema(const std::string &table_name,
+ const std::vector<MeasurementSchema *> &column_schemas,
+ const std::vector<ColumnCategory> &column_categories)
+ : table_name_(table_name), column_categories_(column_categories) {
+ to_lowercase_inplace(table_name_);
+ for (const auto column_schema : column_schemas) {
+ if (column_schema != nullptr) {
+ column_schemas_.emplace_back(
+ std::shared_ptr<MeasurementSchema>(column_schema));
}
}
-
- TableSchema(TableSchema &&other) noexcept
- : table_name_(std::move(other.table_name_)),
- column_schemas_(std::move(other.column_schemas_)),
- column_categories_(std::move(other.column_categories_)) {
+ int idx = 0;
+ for (const auto &measurement_schema : column_schemas_) {
+ to_lowercase_inplace(measurement_schema->measurement_name_);
+ column_pos_index_.insert(
+ std::make_pair(measurement_schema->measurement_name_, idx++));
}
+ }
- TableSchema(const TableSchema &other) = default;
+ TableSchema(TableSchema &&other) noexcept
+ : table_name_(std::move(other.table_name_)),
+ column_schemas_(std::move(other.column_schemas_)),
+ column_categories_(std::move(other.column_categories_)) {}
- int serialize_to(common::ByteStream &out) {
- int ret = common::E_OK;
- if (RET_FAIL(common::SerializationUtil::write_var_uint(
+ int serialize_to(common::ByteStream &out) {
+ int ret = common::E_OK;
+ if (RET_FAIL(common::SerializationUtil::write_var_uint(
column_schemas_.size(), out))) {
- } else {
- for (size_t i = 0; IS_SUCC(ret) && i < column_schemas_.size();
- i++) {
- auto column_schema = column_schemas_[i];
- auto column_category = column_categories_[i];
- if (RET_FAIL(column_schema->serialize_to(out))) {
- } else if (RET_FAIL(common::SerializationUtil::write_i8(
- static_cast<int8_t>(column_category), out))) {
- }
+ } else {
+ for (size_t i = 0; IS_SUCC(ret) && i < column_schemas_.size();
+ i++) {
+ auto column_schema = column_schemas_[i];
+ auto column_category = column_categories_[i];
+ if (RET_FAIL(column_schema->serialize_to(out))) {
+ } else if (RET_FAIL(common::SerializationUtil::write_i8(
+ static_cast<int8_t>(column_category), out))) {
}
}
- return ret;
}
-
- int deserialize(common::ByteStream &in) {
- int ret = common::E_OK;
- uint32_t num_columns;
- if (RET_FAIL(common::SerializationUtil::read_var_uint(
- num_columns, in))) {
- } else {
- for (size_t i = 0; IS_SUCC(ret) && i < num_columns;
- i++) {
- auto column_schema = std::make_shared<MeasurementSchema>();
- int8_t column_category = 0;
- if (RET_FAIL(column_schema->deserialize_from(in))) {
- } else if (RET_FAIL(common::SerializationUtil::read_i8(
- column_category, in))) {
- }
- column_schemas_.emplace_back(column_schema);
-
column_categories_.emplace_back(static_cast<ColumnCategory>(column_category));
+ return ret;
+ }
+
+ int deserialize(common::ByteStream &in) {
+ int ret = common::E_OK;
+ uint32_t num_columns;
+ if (RET_FAIL(
+ common::SerializationUtil::read_var_uint(num_columns, in))) {
+ } else {
+ for (size_t i = 0; IS_SUCC(ret) && i < num_columns; i++) {
+ auto column_schema = std::make_shared<MeasurementSchema>();
+ int8_t column_category = 0;
+ if (RET_FAIL(column_schema->deserialize_from(in))) {
+ } else if (RET_FAIL(common::SerializationUtil::read_i8(
+ column_category, in))) {
}
+ column_schemas_.emplace_back(column_schema);
+ column_categories_.emplace_back(
+ static_cast<ColumnCategory>(column_category));
}
- return ret;
}
+ return ret;
+ }
- ~TableSchema() {
- column_schemas_.clear();
- }
+ ~TableSchema() { column_schemas_.clear(); }
- const std::string &get_table_name() { return table_name_; }
+ const std::string &get_table_name() { return table_name_; }
- std::vector<std::string> get_measurement_names() const {
- std::vector<std::string> ret(column_schemas_.size());
- for (size_t i = 0; i < column_schemas_.size(); i++) {
- ret[i] = column_schemas_[i]->measurement_name_;
- }
- return ret;
+ std::vector<std::string> get_measurement_names() const {
+ std::vector<std::string> ret(column_schemas_.size());
+ for (size_t i = 0; i < column_schemas_.size(); i++) {
+ ret[i] = column_schemas_[i]->measurement_name_;
}
-
- int find_column_index(const std::string &column_name) {
- std::string lower_case_column_name = to_lower(column_name);
- auto it = column_pos_index_.find(lower_case_column_name);
- if (it != column_pos_index_.end()) {
- return it->second;
- } else {
- int index = -1;
- for (size_t i = 0; i < column_schemas_.size(); ++i) {
- if (to_lower(column_schemas_[i]->measurement_name_) ==
- lower_case_column_name) {
- index = static_cast<int>(i);
- break;
- }
+ return ret;
+ }
+
+ int find_column_index(const std::string &column_name) {
+ std::string lower_case_column_name = to_lower(column_name);
+ auto it = column_pos_index_.find(lower_case_column_name);
+ if (it != column_pos_index_.end()) {
+ return it->second;
+ } else {
+ int index = -1;
+ for (size_t i = 0; i < column_schemas_.size(); ++i) {
+ if (to_lower(column_schemas_[i]->measurement_name_) ==
+ lower_case_column_name) {
+ index = static_cast<int>(i);
+ break;
}
- column_pos_index_[lower_case_column_name] = index;
- return index;
}
+ column_pos_index_[lower_case_column_name] = index;
+ return index;
Review Comment:
Need to check all places where column_pos_index_ is used, as it may lead to
unoccupied space for non-existent column_name not being released.
##########
cpp/src/common/tsblock/tsblock.h:
##########
@@ -178,6 +196,16 @@ class ColAppender {
FORCE_INLINE uint32_t get_col_row_count() { return column_row_count_; }
FORCE_INLINE uint32_t get_column_index() { return column_index_; }
+ FORCE_INLINE bool fill(const char *value, uint32_t len,
+ uint32_t end_index) {
+ while (column_row_count_ < end_index) {
+ if (!add_row()) {
+ return false;
+ }
+ append(value, len);
+ }
+ return true;
+ }
Review Comment:
The return is not used. Use void instead.
##########
cpp/src/common/tsblock/tsblock.h:
##########
@@ -95,6 +103,16 @@ class TsBlock {
row_count_ = 0;
}
+ FORCE_INLINE static TsBlock *create_tsblock(TupleDesc *tupledesc,
+ uint32_t max_row_count = 0) {
+ TsBlock *tsblock = new TsBlock(tupledesc, max_row_count);
+ if (IS_FAIL(tsblock->init())) {
+ delete tsblock;
+ tsblock = nullptr;
+ }
+ return tsblock;
Review Comment:
This func will return nullptr, which will case other modules crash.
Check usage in other modules or return error code.
##########
cpp/src/reader/block/single_device_tsblock_reader.cc:
##########
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "single_device_tsblock_reader.h"
+
+namespace storage {
+
+SingleDeviceTsBlockReader::SingleDeviceTsBlockReader(
+ DeviceQueryTask* device_query_task, uint32_t block_size,
+ IMetadataQuerier* metadata_querier, TsFileIOReader* tsfile_io_reader,
+ Filter* time_filter, Filter* field_filter)
+ : device_query_task_(device_query_task),
+ field_filter_(field_filter),
+ block_size_(block_size),
+ tuple_desc_(),
+ tsfile_io_reader_(tsfile_io_reader) {
+ pa_.init(512, common::AllocModID::MOD_TSFILE_READER);
+ tuple_desc_.reset();
+ common::init_common();
+ tuple_desc_.push_back(common::g_time_column_desc);
+ auto table_schema = device_query_task->get_table_schema();
+ for (const auto& column_name : device_query_task_->get_column_names()) {
+ common::ColumnDesc column_desc(
+ table_schema->get_column_desc(column_name));
+ tuple_desc_.push_back(column_desc);
+ }
+ current_block_ = common::TsBlock::create_tsblock(&tuple_desc_, block_size);
+ col_appenders_.resize(tuple_desc_.get_column_count());
+ for (int i = 0; i < tuple_desc_.get_column_count(); i++) {
+ col_appenders_[i] = new common::ColAppender(i, current_block_);
+ }
+ row_appender_ = new common::RowAppender(current_block_);
+ std::vector<ITimeseriesIndex*> time_series_indexs(
+ device_query_task_->get_column_names().size());
+ tsfile_io_reader_->get_timeseries_indexes(
+ device_query_task->get_device_id(),
+ device_query_task->get_column_names(), time_series_indexs, pa_);
Review Comment:
check the return code. time_series_indexs may contain nullptr.
##########
cpp/src/common/schema.h:
##########
@@ -20,319 +20,342 @@
#ifndef COMMON_SCHEMA_H
#define COMMON_SCHEMA_H
+#include <algorithm>
#include <map> // use unordered_map instead
#include <memory>
#include <string>
+#include <unordered_map>
#include "common/db_common.h"
#include "writer/time_chunk_writer.h"
#include "writer/value_chunk_writer.h"
namespace storage {
- class ChunkWriter;
-}
+class ChunkWriter;
+class ValueChunkWriter;
+class TimeChunkWriter;
+} // namespace storage
namespace storage {
- /* schema information for one measurement */
- struct MeasurementSchema {
- std::string measurement_name_; // for example: "s1"
- common::TSDataType data_type_;
- common::TSEncoding encoding_;
- common::CompressionType compression_type_;
- storage::ChunkWriter *chunk_writer_;
- ValueChunkWriter *value_chunk_writer_;
- std::map<std::string, std::string> props_;
-
- MeasurementSchema()
- : measurement_name_(),
- data_type_(common::INVALID_DATATYPE),
- encoding_(common::INVALID_ENCODING),
- compression_type_(common::INVALID_COMPRESSION),
- chunk_writer_(nullptr),
- value_chunk_writer_(nullptr) {
- }
-
- MeasurementSchema(const std::string &measurement_name,
- common::TSDataType data_type)
- : measurement_name_(measurement_name),
- data_type_(data_type),
- encoding_(get_default_encoding_for_type(data_type)),
- compression_type_(common::UNCOMPRESSED),
- chunk_writer_(nullptr),
- value_chunk_writer_(nullptr) {
- }
-
- MeasurementSchema(const std::string &measurement_name,
- common::TSDataType data_type, common::TSEncoding
encoding,
- common::CompressionType compression_type)
- : measurement_name_(measurement_name),
- data_type_(data_type),
- encoding_(encoding),
- compression_type_(compression_type),
- chunk_writer_(nullptr),
- value_chunk_writer_(nullptr) {
- }
-
- int serialize_to(common::ByteStream &out) {
- int ret = common::E_OK;
- if (RET_FAIL(
+/* schema information for one measurement */
+struct MeasurementSchema {
+ std::string measurement_name_; // for example: "s1"
+ common::TSDataType data_type_;
+ common::TSEncoding encoding_;
+ common::CompressionType compression_type_;
+ storage::ChunkWriter *chunk_writer_;
+ ValueChunkWriter *value_chunk_writer_;
+ std::map<std::string, std::string> props_;
+
+ MeasurementSchema()
+ : measurement_name_(),
+ data_type_(common::INVALID_DATATYPE),
+ encoding_(common::INVALID_ENCODING),
+ compression_type_(common::INVALID_COMPRESSION),
+ chunk_writer_(nullptr),
+ value_chunk_writer_(nullptr) {}
+
+ MeasurementSchema(const std::string &measurement_name,
+ common::TSDataType data_type)
+ : measurement_name_(measurement_name),
+ data_type_(data_type),
+ encoding_(get_default_encoding_for_type(data_type)),
+ compression_type_(common::UNCOMPRESSED),
+ chunk_writer_(nullptr),
+ value_chunk_writer_(nullptr) {}
+
+ MeasurementSchema(const std::string &measurement_name,
+ common::TSDataType data_type, common::TSEncoding
encoding,
+ common::CompressionType compression_type)
+ : measurement_name_(measurement_name),
+ data_type_(data_type),
+ encoding_(encoding),
+ compression_type_(compression_type),
+ chunk_writer_(nullptr),
+ value_chunk_writer_(nullptr) {}
+
+ int serialize_to(common::ByteStream &out) {
+ int ret = common::E_OK;
+ if (RET_FAIL(
common::SerializationUtil::write_str(measurement_name_, out)))
{
- } else if (RET_FAIL(
- common::SerializationUtil::write_ui8(data_type_, out))) {
- } else if (RET_FAIL(
- common::SerializationUtil::write_ui8(encoding_, out))) {
- } else if (RET_FAIL(common::SerializationUtil::write_ui8(
- compression_type_, out))) {
- }
- if (ret == common::E_OK) {
- if
(RET_FAIL(common::SerializationUtil::write_ui32(props_.size(),
- out))) {
- for (const auto &prop: props_) {
- if (RET_FAIL(common::SerializationUtil::write_str(
+ } else if (RET_FAIL(
+ common::SerializationUtil::write_ui8(data_type_, out)))
{
+ } else if (RET_FAIL(
+ common::SerializationUtil::write_ui8(encoding_, out))) {
+ } else if (RET_FAIL(common::SerializationUtil::write_ui8(
+ compression_type_, out))) {
+ }
+ if (ret == common::E_OK) {
+ if (RET_FAIL(common::SerializationUtil::write_ui32(props_.size(),
+ out))) {
+ for (const auto &prop : props_) {
+ if (RET_FAIL(common::SerializationUtil::write_str(
prop.first, out))) {
- } else if
(RET_FAIL(common::SerializationUtil::write_str(
- prop.second, out))) {
- }
- if (IS_FAIL(ret)) break;
+ } else if (RET_FAIL(common::SerializationUtil::write_str(
+ prop.second, out))) {
}
+ if (IS_FAIL(ret)) break;
}
}
- return ret;
}
+ return ret;
+ }
- int deserialize_from(common::ByteStream &in) {
- int ret = common::E_OK;
- uint8_t data_type = common::TSDataType::INVALID_DATATYPE,
+ int deserialize_from(common::ByteStream &in) {
+ int ret = common::E_OK;
+ uint8_t data_type = common::TSDataType::INVALID_DATATYPE,
encoding = common::TSEncoding::INVALID_ENCODING,
compression_type =
common::CompressionType::INVALID_COMPRESSION;
- if (RET_FAIL(
+ if (RET_FAIL(
common::SerializationUtil::read_str(measurement_name_, in))) {
- } else if (RET_FAIL(
- common::SerializationUtil::read_ui8(data_type, in))) {
- } else if (RET_FAIL(
- common::SerializationUtil::read_ui8(encoding, in))) {
- } else if (RET_FAIL(common::SerializationUtil::read_ui8(
- compression_type, in))) {
- }
- data_type_ = static_cast<common::TSDataType>(data_type);
- encoding_ = static_cast<common::TSEncoding>(encoding);
- compression_type_ =
static_cast<common::CompressionType>(compression_type);
- uint32_t props_size;
- if (ret == common::E_OK) {
- if (RET_FAIL(common::SerializationUtil::read_ui32(props_size,
- in))) {
- for (uint32_t i = 0; i < props_.size(); ++i) {
- std::string key, value;
- if (RET_FAIL(common::SerializationUtil::read_str(
- key, in))) {
- } else if
(RET_FAIL(common::SerializationUtil::read_str(
- value, in))) {
- }
- props_.insert(std::make_pair(key, value));
- if (IS_FAIL(ret)) break;
+ } else if (RET_FAIL(
+ common::SerializationUtil::read_ui8(data_type, in))) {
+ } else if (RET_FAIL(
+ common::SerializationUtil::read_ui8(encoding, in))) {
+ } else if (RET_FAIL(common::SerializationUtil::read_ui8(
+ compression_type, in))) {
+ }
+ data_type_ = static_cast<common::TSDataType>(data_type);
+ encoding_ = static_cast<common::TSEncoding>(encoding);
+ compression_type_ =
+ static_cast<common::CompressionType>(compression_type);
+ uint32_t props_size;
+ if (ret == common::E_OK) {
+ if (RET_FAIL(
+ common::SerializationUtil::read_ui32(props_size, in))) {
+ for (uint32_t i = 0; i < props_.size(); ++i) {
+ std::string key, value;
+ if (RET_FAIL(
+ common::SerializationUtil::read_str(key, in))) {
+ } else if (RET_FAIL(common::SerializationUtil::read_str(
+ value, in))) {
}
+ props_.insert(std::make_pair(key, value));
+ if (IS_FAIL(ret)) break;
}
}
- return ret;
}
- };
+ return ret;
+ }
+};
- typedef std::map<std::string, MeasurementSchema *> MeasurementSchemaMap;
- typedef std::map<std::string, MeasurementSchema *>::iterator
+typedef std::map<std::string, MeasurementSchema *> MeasurementSchemaMap;
+typedef std::map<std::string, MeasurementSchema *>::iterator
MeasurementSchemaMapIter;
- typedef std::pair<MeasurementSchemaMapIter, bool>
+typedef std::pair<MeasurementSchemaMapIter, bool>
MeasurementSchemaMapInsertResult;
- /* schema information for a device */
- struct MeasurementSchemaGroup {
- // measurement_name -> MeasurementSchema
- MeasurementSchemaMap measurement_schema_map_;
- bool is_aligned_ = false;
- TimeChunkWriter *time_chunk_writer_ = nullptr;
- };
-
- enum class ColumnCategory { TAG = 0, FIELD = 1 };
-
- class TableSchema {
- public:
- static void to_lowercase_inplace(std::string &str) {
- std::transform(str.begin(), str.end(), str.begin(),
- [](unsigned char c) -> unsigned char { return
std::tolower(c); });
- }
-
- TableSchema() = default;
-
- TableSchema(const std::string &table_name,
- const std::vector<MeasurementSchema*>
- &column_schemas,
- const std::vector<ColumnCategory> &column_categories)
- : table_name_(table_name),
- column_categories_(column_categories) {
- to_lowercase_inplace(table_name_);
- for (const auto column_schema : column_schemas) {
- if (column_schema != nullptr) {
-
column_schemas_.emplace_back(std::shared_ptr<MeasurementSchema>(column_schema));
- }
- }
- int idx = 0;
- for (const auto &measurement_schema: column_schemas_) {
- to_lowercase_inplace(measurement_schema->measurement_name_);
- column_pos_index_.insert(
- std::make_pair(measurement_schema->measurement_name_,
idx++));
+/* schema information for a device */
+struct MeasurementSchemaGroup {
+ // measurement_name -> MeasurementSchema
+ MeasurementSchemaMap measurement_schema_map_;
+ bool is_aligned_ = false;
+ TimeChunkWriter *time_chunk_writer_ = nullptr;
+};
+
+enum class ColumnCategory { TAG = 0, FIELD = 1 };
+
+class TableSchema {
+ public:
+ static void to_lowercase_inplace(std::string &str) {
+ std::transform(
+ str.begin(), str.end(), str.begin(),
+ [](unsigned char c) -> unsigned char { return std::tolower(c); });
+ }
+
+ TableSchema() = default;
+
+ TableSchema(const std::string &table_name,
+ const std::vector<MeasurementSchema *> &column_schemas,
+ const std::vector<ColumnCategory> &column_categories)
+ : table_name_(table_name), column_categories_(column_categories) {
+ to_lowercase_inplace(table_name_);
+ for (const auto column_schema : column_schemas) {
+ if (column_schema != nullptr) {
+ column_schemas_.emplace_back(
+ std::shared_ptr<MeasurementSchema>(column_schema));
}
}
-
- TableSchema(TableSchema &&other) noexcept
- : table_name_(std::move(other.table_name_)),
- column_schemas_(std::move(other.column_schemas_)),
- column_categories_(std::move(other.column_categories_)) {
+ int idx = 0;
+ for (const auto &measurement_schema : column_schemas_) {
+ to_lowercase_inplace(measurement_schema->measurement_name_);
+ column_pos_index_.insert(
+ std::make_pair(measurement_schema->measurement_name_, idx++));
}
+ }
- TableSchema(const TableSchema &other) = default;
+ TableSchema(TableSchema &&other) noexcept
+ : table_name_(std::move(other.table_name_)),
+ column_schemas_(std::move(other.column_schemas_)),
+ column_categories_(std::move(other.column_categories_)) {}
- int serialize_to(common::ByteStream &out) {
- int ret = common::E_OK;
- if (RET_FAIL(common::SerializationUtil::write_var_uint(
+ int serialize_to(common::ByteStream &out) {
+ int ret = common::E_OK;
+ if (RET_FAIL(common::SerializationUtil::write_var_uint(
column_schemas_.size(), out))) {
- } else {
- for (size_t i = 0; IS_SUCC(ret) && i < column_schemas_.size();
- i++) {
- auto column_schema = column_schemas_[i];
- auto column_category = column_categories_[i];
- if (RET_FAIL(column_schema->serialize_to(out))) {
- } else if (RET_FAIL(common::SerializationUtil::write_i8(
- static_cast<int8_t>(column_category), out))) {
- }
+ } else {
+ for (size_t i = 0; IS_SUCC(ret) && i < column_schemas_.size();
+ i++) {
+ auto column_schema = column_schemas_[i];
+ auto column_category = column_categories_[i];
+ if (RET_FAIL(column_schema->serialize_to(out))) {
+ } else if (RET_FAIL(common::SerializationUtil::write_i8(
+ static_cast<int8_t>(column_category), out))) {
}
}
- return ret;
}
-
- int deserialize(common::ByteStream &in) {
- int ret = common::E_OK;
- uint32_t num_columns;
- if (RET_FAIL(common::SerializationUtil::read_var_uint(
- num_columns, in))) {
- } else {
- for (size_t i = 0; IS_SUCC(ret) && i < num_columns;
- i++) {
- auto column_schema = std::make_shared<MeasurementSchema>();
- int8_t column_category = 0;
- if (RET_FAIL(column_schema->deserialize_from(in))) {
- } else if (RET_FAIL(common::SerializationUtil::read_i8(
- column_category, in))) {
- }
- column_schemas_.emplace_back(column_schema);
-
column_categories_.emplace_back(static_cast<ColumnCategory>(column_category));
+ return ret;
+ }
+
+ int deserialize(common::ByteStream &in) {
+ int ret = common::E_OK;
+ uint32_t num_columns;
+ if (RET_FAIL(
+ common::SerializationUtil::read_var_uint(num_columns, in))) {
+ } else {
+ for (size_t i = 0; IS_SUCC(ret) && i < num_columns; i++) {
+ auto column_schema = std::make_shared<MeasurementSchema>();
+ int8_t column_category = 0;
+ if (RET_FAIL(column_schema->deserialize_from(in))) {
+ } else if (RET_FAIL(common::SerializationUtil::read_i8(
+ column_category, in))) {
}
+ column_schemas_.emplace_back(column_schema);
+ column_categories_.emplace_back(
+ static_cast<ColumnCategory>(column_category));
}
- return ret;
}
+ return ret;
+ }
- ~TableSchema() {
- column_schemas_.clear();
- }
+ ~TableSchema() { column_schemas_.clear(); }
- const std::string &get_table_name() { return table_name_; }
+ const std::string &get_table_name() { return table_name_; }
- std::vector<std::string> get_measurement_names() const {
- std::vector<std::string> ret(column_schemas_.size());
- for (size_t i = 0; i < column_schemas_.size(); i++) {
- ret[i] = column_schemas_[i]->measurement_name_;
- }
- return ret;
+ std::vector<std::string> get_measurement_names() const {
+ std::vector<std::string> ret(column_schemas_.size());
+ for (size_t i = 0; i < column_schemas_.size(); i++) {
+ ret[i] = column_schemas_[i]->measurement_name_;
}
-
- int find_column_index(const std::string &column_name) {
- std::string lower_case_column_name = to_lower(column_name);
- auto it = column_pos_index_.find(lower_case_column_name);
- if (it != column_pos_index_.end()) {
- return it->second;
- } else {
- int index = -1;
- for (size_t i = 0; i < column_schemas_.size(); ++i) {
- if (to_lower(column_schemas_[i]->measurement_name_) ==
- lower_case_column_name) {
- index = static_cast<int>(i);
- break;
- }
+ return ret;
+ }
+
+ int find_column_index(const std::string &column_name) {
+ std::string lower_case_column_name = to_lower(column_name);
+ auto it = column_pos_index_.find(lower_case_column_name);
+ if (it != column_pos_index_.end()) {
+ return it->second;
+ } else {
+ int index = -1;
+ for (size_t i = 0; i < column_schemas_.size(); ++i) {
+ if (to_lower(column_schemas_[i]->measurement_name_) ==
+ lower_case_column_name) {
+ index = static_cast<int>(i);
+ break;
}
- column_pos_index_[lower_case_column_name] = index;
- return index;
}
+ column_pos_index_[lower_case_column_name] = index;
+ return index;
}
-
- void update(ChunkGroupMeta *chunk_group_meta) {
- for (auto iter = chunk_group_meta->chunk_meta_list_.begin();
- iter != chunk_group_meta->chunk_meta_list_.end(); iter++) {
- auto &chunk_meta = iter.get();
- int column_idx =
-
find_column_index(chunk_meta->measurement_name_.to_std_string());
- if (column_idx == -1) {
- auto measurement_schema =
std::make_shared<MeasurementSchema>(
- chunk_meta->measurement_name_.to_std_string(),
- chunk_meta->data_type_, chunk_meta->encoding_,
- chunk_meta->compression_type_);
- column_schemas_.emplace_back(measurement_schema);
- column_categories_.emplace_back(ColumnCategory::FIELD);
- column_pos_index_.insert(
-
std::make_pair(chunk_meta->measurement_name_.to_std_string(),
- column_schemas_.size() - 1));
- } else {
- auto origin_measurement_schema =
column_schemas_.at(column_idx);
- if (origin_measurement_schema->data_type_ !=
- chunk_meta->data_type_) {
- origin_measurement_schema->data_type_ =
- common::TSDataType::STRING;
- }
+ }
+
+ void update(ChunkGroupMeta *chunk_group_meta) {
+ for (auto iter = chunk_group_meta->chunk_meta_list_.begin();
+ iter != chunk_group_meta->chunk_meta_list_.end(); iter++) {
+ auto &chunk_meta = iter.get();
+ int column_idx = find_column_index(
+ chunk_meta->measurement_name_.to_std_string());
+ if (column_idx == -1) {
+ auto measurement_schema = std::make_shared<MeasurementSchema>(
+ chunk_meta->measurement_name_.to_std_string(),
+ chunk_meta->data_type_, chunk_meta->encoding_,
+ chunk_meta->compression_type_);
+ column_schemas_.emplace_back(measurement_schema);
+ column_categories_.emplace_back(ColumnCategory::FIELD);
+ column_pos_index_.insert(std::make_pair(
+ chunk_meta->measurement_name_.to_std_string(),
+ column_schemas_.size() - 1));
+ } else {
+ auto origin_measurement_schema =
column_schemas_.at(column_idx);
+ if (origin_measurement_schema->data_type_ !=
+ chunk_meta->data_type_) {
+ origin_measurement_schema->data_type_ =
+ common::TSDataType::STRING;
}
}
}
+ }
- std::vector<common::TSDataType> get_data_types() const {
- std::vector<common::TSDataType> ret;
- for (const auto &measurement_schema: column_schemas_) {
- ret.emplace_back(measurement_schema->data_type_);
- }
- return ret;
+ std::vector<common::TSDataType> get_data_types() const {
+ std::vector<common::TSDataType> ret;
+ for (const auto &measurement_schema : column_schemas_) {
+ ret.emplace_back(measurement_schema->data_type_);
}
+ return ret;
+ }
- std::vector<ColumnCategory> get_column_categories() const {
- return column_categories_;
- }
+ std::vector<ColumnCategory> get_column_categories() const {
+ return column_categories_;
+ }
- std::vector<std::shared_ptr<MeasurementSchema> >
get_measurement_schemas()
+ std::vector<std::shared_ptr<MeasurementSchema> > get_measurement_schemas()
const {
- return column_schemas_;
- }
-
- private:
- static std::string to_lower(const std::string &str) {
- std::string result;
- std::transform(str.begin(), str.end(), std::back_inserter(result),
- [](unsigned char c) -> unsigned char { return
std::tolower(c); });
- return result;
+ return column_schemas_;
+ }
+
+ common::ColumnDesc get_column_desc(const std::string &column_name) {
+ int column_idx = find_column_index(column_name);
+ return common::ColumnDesc(
+ column_schemas_[column_idx]->data_type_,
+ column_schemas_[column_idx]->encoding_,
+ column_schemas_[column_idx]->compression_type_, INVALID_TTL,
+ column_name, common::TsID());
+ }
Review Comment:
find_colum_index may return -1 which will lead to vector overflow.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]