This is an automated email from the ASF dual-hosted git repository.
colinlee pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/tsfile.git
The following commit(s) were added to refs/heads/develop by this push:
new 271fa6d0 Fix data losing and loading tsfile in iotdb.
271fa6d0 is described below
commit 271fa6d0e7ded9387726cfa849e8a4afd3bd2267
Author: Colin Lee <[email protected]>
AuthorDate: Fri Mar 14 12:14:27 2025 +0800
Fix data losing and loading tsfile in iotdb.
---
cpp/examples/c_examples/demo_write.c | 6 -
cpp/src/common/schema.h | 12 --
cpp/src/common/tablet.h | 12 ++
cpp/src/common/tsfile_common.h | 4 +-
cpp/src/cwrapper/errno_define_c.h | 3 +-
cpp/src/cwrapper/tsfile_cwrapper.cc | 86 +++++++--
cpp/src/cwrapper/tsfile_cwrapper.h | 24 ++-
cpp/src/encoding/ts2diff_decoder.h | 20 +-
cpp/src/file/tsfile_io_writer.cc | 77 ++++----
cpp/src/file/tsfile_io_writer.h | 1 +
cpp/src/reader/aligned_chunk_reader.cc | 19 +-
cpp/src/reader/aligned_chunk_reader.h | 12 +-
cpp/src/reader/bloom_filter.cc | 2 +-
cpp/src/reader/chunk_reader.h | 3 +-
cpp/src/reader/table_query_executor.cc | 4 +-
cpp/src/reader/table_result_set.cc | 9 +-
cpp/src/reader/tsfile_reader.cc | 14 +-
cpp/src/utils/errno_define.h | 3 +-
cpp/src/utils/storage_utils.h | 14 ++
cpp/src/writer/time_page_writer.cc | 12 +-
cpp/src/writer/tsfile_table_writer.cc | 12 ++
cpp/src/writer/value_page_writer.h | 6 +-
cpp/test/cwrapper/c_release_test.cc | 337 +++++++++++++++++++++++++++++++++
cpp/test/cwrapper/cwrapper_test.cc | 6 +-
24 files changed, 557 insertions(+), 141 deletions(-)
diff --git a/cpp/examples/c_examples/demo_write.c
b/cpp/examples/c_examples/demo_write.c
index 3197a6d2..9f116217 100644
--- a/cpp/examples/c_examples/demo_write.c
+++ b/cpp/examples/c_examples/demo_write.c
@@ -38,20 +38,14 @@ ERRNO write_tsfile() {
table_schema.column_schemas[0] =
(ColumnSchema){.column_name = strdup("id1"),
.data_type = TS_DATATYPE_STRING,
- .compression = TS_COMPRESSION_UNCOMPRESSED,
- .encoding = TS_ENCODING_PLAIN,
.column_category = TAG};
table_schema.column_schemas[1] =
(ColumnSchema){.column_name = strdup("id2"),
.data_type = TS_DATATYPE_STRING,
- .compression = TS_COMPRESSION_UNCOMPRESSED,
- .encoding = TS_ENCODING_PLAIN,
.column_category = TAG};
table_schema.column_schemas[2] =
(ColumnSchema){.column_name = strdup("s1"),
.data_type = TS_DATATYPE_INT32,
- .compression = TS_COMPRESSION_UNCOMPRESSED,
- .encoding = TS_ENCODING_PLAIN,
.column_category = FIELD};
remove("test_c.tsfile");
diff --git a/cpp/src/common/schema.h b/cpp/src/common/schema.h
index e0e2b3b8..72fd028f 100644
--- a/cpp/src/common/schema.h
+++ b/cpp/src/common/schema.h
@@ -410,18 +410,6 @@ class TableSchema {
}
private:
- static void to_lowercase_inplace(std::string &str) {
- std::transform(
- str.begin(), str.end(), str.begin(),
- [](unsigned char c) -> unsigned char { return std::tolower(c); });
- }
- static std::string to_lower(const std::string &str) {
- std::string result;
- std::transform(
- str.begin(), str.end(), std::back_inserter(result),
- [](unsigned char c) -> unsigned char { return std::tolower(c); });
- return result;
- }
std::string table_name_;
std::vector<std::shared_ptr<MeasurementSchema> > column_schemas_;
diff --git a/cpp/src/common/tablet.h b/cpp/src/common/tablet.h
index df24fad7..e69d477e 100644
--- a/cpp/src/common/tablet.h
+++ b/cpp/src/common/tablet.h
@@ -214,6 +214,18 @@ class Tablet {
return schema_vec_->at(column_index).measurement_name_;
}
+ void set_column_name(uint32_t column_index, const std::string &name) {
+ schema_vec_->at(column_index).measurement_name_ = name;
+ }
+
+ const std::map<std::string, int>& get_schema_map() const {
+ return schema_map_;
+ }
+
+ void set_schema_map(const std::map<std::string, int> &schema_map) {
+ schema_map_ = schema_map;
+ }
+
friend class TabletColIterator;
friend class TsFileWriter;
friend struct MeasurementNamesFromTablet;
diff --git a/cpp/src/common/tsfile_common.h b/cpp/src/common/tsfile_common.h
index 8a77f1fe..9fca0168 100644
--- a/cpp/src/common/tsfile_common.h
+++ b/cpp/src/common/tsfile_common.h
@@ -1082,10 +1082,10 @@ struct TsFileMeta {
BloomFilter *bloom_filter_;
common::PageArena *page_arena_;
- int get_table_metaindex_node(common::String &table_name,
+ int get_table_metaindex_node(const std::string &table_name,
MetaIndexNode *&ret_node) {
std::map<std::string, std::shared_ptr<MetaIndexNode>>::iterator it =
- table_metadata_index_node_map_.find(table_name.to_std_string());
+ table_metadata_index_node_map_.find(table_name);
if (it == table_metadata_index_node_map_.end()) {
return common::E_TABLE_NOT_EXIST;
}
diff --git a/cpp/src/cwrapper/errno_define_c.h
b/cpp/src/cwrapper/errno_define_c.h
index cf7ad1f4..b767ca88 100644
--- a/cpp/src/cwrapper/errno_define_c.h
+++ b/cpp/src/cwrapper/errno_define_c.h
@@ -27,8 +27,7 @@
#define RET_INVALID_ARG 4
#define RET_OUT_OF_RANGE 5
#define RET_PARTIAL_READ 6
-#define RET_NET_BIND_ERR 7
-#define RET_NET_SOCKET_ERR 8
+#define RET_INVALID_SCHEMA 8
#define RET_NET_EPOLL_ERR 9
#define RET_NET_EPOLL_WAIT_ERR 10
#define RET_NET_RECV_ERR 11
diff --git a/cpp/src/cwrapper/tsfile_cwrapper.cc
b/cpp/src/cwrapper/tsfile_cwrapper.cc
index e18c843a..371e8ced 100644
--- a/cpp/src/cwrapper/tsfile_cwrapper.cc
+++ b/cpp/src/cwrapper/tsfile_cwrapper.cc
@@ -64,28 +64,76 @@ WriteFile write_file_new(const char *pathname, ERRNO
*err_code) {
TsFileWriter tsfile_writer_new(WriteFile file, TableSchema *schema,
ERRNO *err_code) {
+ if (schema->column_num == 0) {
+ *err_code = common::E_INVALID_SCHEMA;
+ return nullptr;
+ }
+
init_tsfile_config();
std::vector<common::ColumnSchema> column_schemas;
+ std::set<std::string> column_names;
for (int i = 0; i < schema->column_num; i++) {
ColumnSchema cur_schema = schema->column_schemas[i];
- column_schemas.emplace_back(common::ColumnSchema(
+ if (column_names.find(cur_schema.column_name) != column_names.end()) {
+ *err_code = common::E_INVALID_SCHEMA;
+ return nullptr;
+ }
+ column_names.insert(cur_schema.column_name);
+ if (cur_schema.column_category == TAG &&
+ cur_schema.data_type != TS_DATATYPE_STRING) {
+ *err_code = common::E_INVALID_SCHEMA;
+ return nullptr;
+ }
+
+ column_schemas.emplace_back(
cur_schema.column_name,
static_cast<common::TSDataType>(cur_schema.data_type),
- static_cast<common::CompressionType>(cur_schema.compression),
- static_cast<common::TSEncoding>(cur_schema.encoding),
- static_cast<common::ColumnCategory>(cur_schema.column_category)));
+ static_cast<common::ColumnCategory>(cur_schema.column_category));
}
- // There is no need to free table_schema.
storage::TableSchema *table_schema =
new storage::TableSchema(schema->table_name, column_schemas);
- *err_code = common::E_OK;
auto table_writer = new storage::TsFileTableWriter(
static_cast<storage::WriteFile *>(file), table_schema);
delete table_schema;
+ *err_code = common::E_OK;
return table_writer;
}
+TsFileWriter tsfile_writer_new_with_memory_threshold(WriteFile file,
+ TableSchema *schema,
+ uint64_t memory_threshold,
+ ERRNO *err_code) {
+ if (schema->column_num == 0) {
+ *err_code = common::E_INVALID_SCHEMA;
+ return nullptr;
+ }
+ init_tsfile_config();
+ std::vector<common::ColumnSchema> column_schemas;
+ std::set<std::string> column_names;
+ for (int i = 0; i < schema->column_num; i++) {
+ ColumnSchema cur_schema = schema->column_schemas[i];
+ if (column_names.find(cur_schema.column_name) == column_names.end()) {
+ *err_code = common::E_INVALID_SCHEMA;
+ return nullptr;
+ }
+ column_names.insert(cur_schema.column_name);
+ column_schemas.emplace_back(
+ cur_schema.column_name,
+ static_cast<common::TSDataType>(cur_schema.data_type),
+ static_cast<common::ColumnCategory>(cur_schema.column_category));
+ }
+
+ storage::TableSchema *table_schema =
+ new storage::TableSchema(schema->table_name, column_schemas);
+
+ auto table_writer =
+ new storage::TsFileTableWriter(static_cast<storage::WriteFile *>(file),
+ table_schema, memory_threshold);
+ *err_code = common::E_OK;
+ delete table_schema;
+ return table_writer;
+}
TsFileReader tsfile_reader_new(const char *pathname, ERRNO *err_code) {
init_tsfile_config();
auto reader = new storage::TsFileReader();
@@ -99,6 +147,9 @@ TsFileReader tsfile_reader_new(const char *pathname, ERRNO
*err_code) {
}
ERRNO tsfile_writer_close(TsFileWriter writer) {
+ if (writer == nullptr) {
+ return common::E_OK;
+ }
auto *w = static_cast<storage::TsFileTableWriter *>(writer);
int ret = w->flush();
if (ret != common::E_OK) {
@@ -264,7 +315,8 @@ bool tsfile_result_set_next(ResultSet result_set, ERRNO
*err_code) {
type tsfile_result_set_get_value_by_name_##type(ResultSet result_set,
\
const char *column_name) {
\
auto *r = static_cast<storage::TableResultSet *>(result_set);
\
- return r->get_value<type>(column_name);
\
+ std::string column_name_(column_name);
\
+ return r->get_value<type>(storage::to_lower(column_name_));
\
}
TSFILE_RESULT_SET_GET_VALUE_BY_NAME_DEF(bool);
@@ -275,7 +327,9 @@ TSFILE_RESULT_SET_GET_VALUE_BY_NAME_DEF(double);
char *tsfile_result_set_get_value_by_name_string(ResultSet result_set,
const char *column_name) {
auto *r = static_cast<storage::TableResultSet *>(result_set);
- common::String *ret = r->get_value<common::String *>(column_name);
+ std::string column_name_(column_name);
+ common::String *ret =
+ r->get_value<common::String *>(storage::to_lower(column_name_));
// Caller should free return's char* 's space.
char *dup = (char *)malloc(ret->len_ + 1);
if (dup) {
@@ -340,8 +394,8 @@ ResultSetMetaData tsfile_result_set_get_metadata(ResultSet
result_set) {
for (int i = 0; i < meta_data.column_num; i++) {
meta_data.column_names[i] =
strdup(result_set_metadata->get_column_name(i + 1).c_str());
- meta_data.data_types[i] =
- static_cast<TSDataType>(result_set_metadata->get_column_type(i +
1));
+ meta_data.data_types[i] = static_cast<TSDataType>(
+ result_set_metadata->get_column_type(i + 1));
}
return meta_data;
}
@@ -414,10 +468,6 @@ TableSchema tsfile_reader_get_table_schema(TsFileReader
reader,
strdup(column_schema->measurement_name_.c_str());
ret_schema.column_schemas[i].data_type =
static_cast<TSDataType>(column_schema->data_type_);
- ret_schema.column_schemas[i].compression =
- static_cast<CompressionType>(column_schema->compression_type_);
- ret_schema.column_schemas[i].encoding =
- static_cast<TSEncoding>(column_schema->encoding_);
ret_schema.column_schemas[i].column_category =
static_cast<ColumnCategory>(
table_shcema->get_column_categories()[i]);
@@ -444,10 +494,6 @@ TableSchema
*tsfile_reader_get_all_table_schemas(TsFileReader reader,
strdup(column_schemas[j]->measurement_name_.c_str());
ret[i].column_schemas[j].data_type =
static_cast<TSDataType>(column_schemas[j]->data_type_);
- ret[i].column_schemas[j].encoding =
- static_cast<TSEncoding>(column_schemas[j]->encoding_);
- ret[i].column_schemas[j].compression =
static_cast<CompressionType>(
- column_schemas[j]->compression_type_);
ret[i].column_schemas[j].column_category =
static_cast<ColumnCategory>(
table_schemas[i]->get_column_categories()[j]);
@@ -502,7 +548,9 @@ void free_table_schema(TableSchema schema) {
for (int i = 0; i < schema.column_num; i++) {
free_column_schema(schema.column_schemas[i]);
}
- free(schema.column_schemas);
+ if (schema.column_num > 0) {
+ free(schema.column_schemas);
+ }
}
void free_column_schema(ColumnSchema schema) { free(schema.column_name); }
diff --git a/cpp/src/cwrapper/tsfile_cwrapper.h
b/cpp/src/cwrapper/tsfile_cwrapper.h
index 16b2cd21..b152782b 100644
--- a/cpp/src/cwrapper/tsfile_cwrapper.h
+++ b/cpp/src/cwrapper/tsfile_cwrapper.h
@@ -72,8 +72,6 @@ typedef enum column_category { TAG = 0, FIELD = 1 }
ColumnCategory;
typedef struct column_schema {
char* column_name;
TSDataType data_type;
- CompressionType compression;
- TSEncoding encoding;
ColumnCategory column_category;
} ColumnSchema;
@@ -151,6 +149,25 @@ WriteFile write_file_new(const char* pathname, ERRNO*
err_code);
TsFileWriter tsfile_writer_new(WriteFile file, TableSchema* schema,
ERRNO* err_code);
+/**
+ * @brief Creates a TsFileWriter for writing a TsFile.
+ *
+ * @param file Target file where the table data will be written.
+ * @param schema Table schema definition.
+ * - Ownership: Should be free it by Caller.
+ * @param memory_threshold used to limit the memory size
+ * of objects. If set to 0, no memory limit is enforced.
+ * @param err_code [out] E_OK(0), or check error code in errno_define_c.h.
+ *
+ * @return TsFileWriter Valid handle on success, NULL on failure.
+ *
+ * @note Call tsfile_writer_close() to release resources.
+ */
+TsFileWriter tsfile_writer_new_with_memory_threshold(WriteFile file,
+ TableSchema* schema,
+ uint64_t memory_threshold,
+ ERRNO* err_code);
+
/**
* @brief Creates a TsFileReader for reading a TsFile.
*
@@ -491,7 +508,6 @@ void free_table_schema(TableSchema schema);
void free_column_schema(ColumnSchema schema);
void free_write_file(WriteFile* write_file);
-
// ---------- !For Python API! ----------
/** WARN! Temporary internal method/interface.
@@ -500,7 +516,7 @@ void free_write_file(WriteFile* write_file);
// Create a tsfile writer.
TsFileWriter _tsfile_writer_new(const char* pathname, ERRNO* err_code);
- // Create a tablet will name, data_type and max_rows.
+// Create a tablet with name, data_type and max_rows.
Tablet _tablet_new_with_target_name(const char* device_id,
char** column_name_list,
TSDataType* data_types, int column_num,
diff --git a/cpp/src/encoding/ts2diff_decoder.h
b/cpp/src/encoding/ts2diff_decoder.h
index aa740fa7..5ad0e89c 100644
--- a/cpp/src/encoding/ts2diff_decoder.h
+++ b/cpp/src/encoding/ts2diff_decoder.h
@@ -49,8 +49,6 @@ class TS2DIFFDecoder : public Decoder {
}
FORCE_INLINE bool has_remaining() {
- // std::cout << "has_remaining, current_index_=" << current_index_ <<
",
- // write_index_=" << write_index_ << std::endl;
return bits_left_ != 0 || (current_index_ <= write_index_ &&
write_index_ != -1 && current_index_ != 0);
}
@@ -129,7 +127,11 @@ inline int32_t
TS2DIFFDecoder<int32_t>::decode(common::ByteStream &in) {
ret_value = first_value_;
bits_left_ = 0;
buffer_ = 0;
- current_index_ = 1;
+ if (write_index_ == 0) {
+ current_index_ = 0;
+ } else {
+ current_index_ = 1;
+ }
return ret_value;
}
if (current_index_++ >= write_index_) {
@@ -140,7 +142,6 @@ inline int32_t
TS2DIFFDecoder<int32_t>::decode(common::ByteStream &in) {
stored_value_ = read_long(bit_width_, in);
ret_value = stored_value_ + first_value_ + delta_min_;
first_value_ = ret_value;
-
return ret_value;
}
@@ -152,7 +153,11 @@ inline int64_t
TS2DIFFDecoder<int64_t>::decode(common::ByteStream &in) {
common::SerializationUtil::read_i64(delta_min_, in);
common::SerializationUtil::read_i64(first_value_, in);
ret_value = first_value_;
- current_index_ = 1;
+ if (write_index_ == 0) {
+ current_index_ = 0;
+ } else {
+ current_index_ = 1;
+ }
return ret_value;
}
if (current_index_++ >= write_index_) {
@@ -161,11 +166,6 @@ inline int64_t
TS2DIFFDecoder<int64_t>::decode(common::ByteStream &in) {
stored_value_ = (int64_t)read_long(bit_width_, in);
ret_value = stored_value_ + first_value_ + delta_min_;
first_value_ = ret_value;
-
- // std::cout << "decode, current_index_=" << current_index_ << ",
- // write_index_=" << write_index_ << ", ret_value=" << ret_value <<
- // std::endl;
-
return ret_value;
}
diff --git a/cpp/src/file/tsfile_io_writer.cc b/cpp/src/file/tsfile_io_writer.cc
index 6364adf8..af4c29dc 100644
--- a/cpp/src/file/tsfile_io_writer.cc
+++ b/cpp/src/file/tsfile_io_writer.cc
@@ -113,6 +113,7 @@ int TsFileIOWriter::start_flush_chunk_group(
if (ret != common::E_OK) {
return ret;
}
+ is_aligned_ = is_aligned;
cur_device_name_ = device_name;
ASSERT(cur_chunk_group_meta_ == nullptr);
use_prev_alloc_cgm_ = false;
@@ -140,9 +141,9 @@ int TsFileIOWriter::start_flush_chunk(ByteStream
&chunk_data,
ColumnSchema &col_schema,
int32_t num_of_pages) {
std::string measurement_name = col_schema.get_measurement_name_str();
- return start_flush_chunk(chunk_data, measurement_name,
col_schema.data_type_,
- col_schema.encoding_, col_schema.compression_,
- num_of_pages);
+ return start_flush_chunk(chunk_data, measurement_name,
+ col_schema.data_type_, col_schema.encoding_,
+ col_schema.compression_, num_of_pages);
}
int TsFileIOWriter::start_flush_chunk(common::ByteStream &chunk_data,
@@ -183,6 +184,11 @@ int TsFileIOWriter::start_flush_chunk(common::ByteStream
&chunk_data,
chunk_header.chunk_type_ =
(num_of_pages <= 1 ? ONLY_ONE_PAGE_CHUNK_HEADER_MARKER
: CHUNK_HEADER_MARKER);
+ if (is_aligned_) {
+ chunk_header.chunk_type_ |= (data_type == common::VECTOR)
+ ? TIME_COLUMN_MASK
+ : VALUE_COLUMN_MASK;
+ }
OFFSET_DEBUG("start flush chunk header");
ret = chunk_header.serialize_to(write_stream_);
OFFSET_DEBUG("after flush chunk header");
@@ -284,8 +290,8 @@ int TsFileIOWriter::write_log_index_range() {
#if DEBUG_SE
void debug_print_chunk_group_meta(ChunkGroupMeta *cgm) {
- std::cout << "ChunkGroupMeta = {device_id_="
- << cgm->device_id_ << ", chunk_meta_list_={";
+ std::cout << "ChunkGroupMeta = {device_id_=" << cgm->device_id_
+ << ", chunk_meta_list_={";
SimpleList<ChunkMeta *>::Iterator cm_it = cgm->chunk_meta_list_.begin();
for (; cm_it != cgm->chunk_meta_list_.end(); cm_it++) {
ChunkMeta *cm = cm_it.get();
@@ -397,12 +403,12 @@ int TsFileIOWriter::write_file_index() {
if (IS_SUCC(ret)) {
OFFSET_DEBUG("before ts_index written");
- if (ts_index.get_data_type() != common::VECTOR) {
- common::String tmp_device_name;
- tmp_device_name.dup_from(device_id->get_device_name(),
- meta_allocator_);
- ret = filter.add_path_entry(tmp_device_name, measurement_name);
- }
+ common::String tmp_device_name;
+ tmp_device_name.dup_from(device_id->get_device_name(),
+ meta_allocator_);
+ // Time column also need add to bloom filter.
+ ret = filter.add_path_entry(tmp_device_name, measurement_name);
+
if (RET_FAIL(ts_index.serialize_to(write_stream_))) {
} else {
@@ -447,7 +453,8 @@ int TsFileIOWriter::write_file_index() {
}
std::map<std::string, std::shared_ptr<MetaIndexNode>> table_nodes_map;
for (auto &entry : table_device_nodes_map) {
- auto meta_index_node =
std::make_shared<MetaIndexNode>(&meta_allocator_);
+ auto meta_index_node =
+ std::make_shared<MetaIndexNode>(&meta_allocator_);
build_device_level(entry.second, meta_index_node, writing_mm);
table_nodes_map[entry.first] = meta_index_node;
}
@@ -484,8 +491,9 @@ int TsFileIOWriter::build_device_level(DeviceNodeMap
&device_map,
FileIndexWritingMemManager &wmm) {
int ret = E_OK;
- SimpleList<std::shared_ptr<MetaIndexNode>> node_queue(1024,
- MOD_TSFILE_WRITER_META); // FIXME
+ SimpleList<std::shared_ptr<MetaIndexNode>> node_queue(
+ 1024,
+ MOD_TSFILE_WRITER_META); // FIXME
DeviceNodeMapIterator device_map_iter;
std::shared_ptr<MetaIndexNode> cur_index_node = nullptr;
@@ -510,8 +518,7 @@ int TsFileIOWriter::build_device_level(DeviceNodeMap
&device_map,
wmm, cur_index_node, LEAF_DEVICE))) {
}
}
- if (RET_FAIL(
- alloc_and_init_meta_index_entry(wmm, entry, device_id))) {
+ if (RET_FAIL(alloc_and_init_meta_index_entry(wmm, entry, device_id))) {
} else if (RET_FAIL(
device_map_iter->second->serialize_to(write_stream_))) {
} else if (RET_FAIL(cur_index_node->push_entry(entry))) {
@@ -594,8 +601,8 @@ int TsFileIOWriter::alloc_and_init_meta_index_entry(
}
auto entry_ptr = static_cast<DeviceMetaIndexEntry *>(buf);
new (entry_ptr) DeviceMetaIndexEntry(device_id, cur_file_position());
- ret_entry =
- std::shared_ptr<IMetaIndexEntry>(entry_ptr,
IMetaIndexEntry::self_destructor);
+ ret_entry = std::shared_ptr<IMetaIndexEntry>(
+ entry_ptr, IMetaIndexEntry::self_destructor);
#if DEBUG_SE
std::cout << "alloc_and_init_meta_index_entry, MetaIndexEntry="
<< *ret_entry << std::endl;
@@ -613,8 +620,8 @@ int TsFileIOWriter::alloc_and_init_meta_index_entry(
auto entry_ptr = static_cast<MeasurementMetaIndexEntry *>(buf);
new (entry_ptr)
MeasurementMetaIndexEntry(name, cur_file_position(), wmm.pa_);
- ret_entry =
- std::shared_ptr<IMetaIndexEntry>(entry_ptr,
IMetaIndexEntry::self_destructor);
+ ret_entry = std::shared_ptr<IMetaIndexEntry>(
+ entry_ptr, IMetaIndexEntry::self_destructor);
#if DEBUG_SE
std::cout << "alloc_and_init_meta_index_entry, MetaIndexEntry="
<< *ret_entry << std::endl;
@@ -625,17 +632,18 @@ int TsFileIOWriter::alloc_and_init_meta_index_entry(
int TsFileIOWriter::alloc_and_init_meta_index_node(
FileIndexWritingMemManager &wmm, std::shared_ptr<MetaIndexNode> &ret_node,
MetaIndexNodeType node_type) {
-// void *buf = wmm.pa_.alloc(sizeof(MetaIndexNode));
-// if (IS_NULL(buf)) {
-// return E_OOM;
-// }
-// auto *node_ptr = new (buf) MetaIndexNode(&wmm.pa_);
-// node_ptr->node_type_ = node_type;
-// ret_node = std::shared_ptr<MetaIndexNode>(node_ptr, [](MetaIndexNode
*ptr) {
-// if (ptr) {
-// ptr->~MetaIndexNode();
-// }
-// });
+ // void *buf = wmm.pa_.alloc(sizeof(MetaIndexNode));
+ // if (IS_NULL(buf)) {
+ // return E_OOM;
+ // }
+ // auto *node_ptr = new (buf) MetaIndexNode(&wmm.pa_);
+ // node_ptr->node_type_ = node_type;
+ // ret_node = std::shared_ptr<MetaIndexNode>(node_ptr, [](MetaIndexNode
+ // *ptr) {
+ // if (ptr) {
+ // ptr->~MetaIndexNode();
+ // }
+ // });
ret_node = std::make_shared<MetaIndexNode>(&wmm.pa_);
ret_node->node_type_ = node_type;
wmm.all_index_nodes_.push_back(ret_node);
@@ -647,7 +655,8 @@ int TsFileIOWriter::add_cur_index_node_to_queue(
SimpleList<std::shared_ptr<MetaIndexNode>> *queue) const {
node->end_offset_ = cur_file_position();
#if DEBUG_SE
- std::cout << "add_cur_index_node_to_queue, node=" << *node << ", set
offset=" << cur_file_position() << std::endl;
+ std::cout << "add_cur_index_node_to_queue, node=" << *node
+ << ", set offset=" << cur_file_position() << std::endl;
#endif
return queue->push_back(node);
}
@@ -799,7 +808,8 @@ int TsFileIOWriter::generate_root(
return ret;
}
-void
TsFileIOWriter::destroy_node_list(common::SimpleList<std::shared_ptr<MetaIndexNode>>
*list) {
+void TsFileIOWriter::destroy_node_list(
+ common::SimpleList<std::shared_ptr<MetaIndexNode>> *list) {
if (list) {
for (auto iter = list->begin(); iter != list->end(); iter++) {
if (iter.get()) {
@@ -807,7 +817,6 @@ void
TsFileIOWriter::destroy_node_list(common::SimpleList<std::shared_ptr<MetaIn
}
}
}
-
}
int TsFileIOWriter::clone_node_list(
diff --git a/cpp/src/file/tsfile_io_writer.h b/cpp/src/file/tsfile_io_writer.h
index d904cea3..4de4fce9 100644
--- a/cpp/src/file/tsfile_io_writer.h
+++ b/cpp/src/file/tsfile_io_writer.h
@@ -203,6 +203,7 @@ private:
std::string encrypt_level_;
std::string encrypt_type_;
std::string encrypt_key_;
+ bool is_aligned_;
};
} // end namespace storage
diff --git a/cpp/src/reader/aligned_chunk_reader.cc
b/cpp/src/reader/aligned_chunk_reader.cc
index 9884b459..e99973a8 100644
--- a/cpp/src/reader/aligned_chunk_reader.cc
+++ b/cpp/src/reader/aligned_chunk_reader.cc
@@ -362,8 +362,7 @@ int AlignedChunkReader::decode_cur_time_page_data() {
char *time_uncompressed_buf = nullptr;
uint32_t time_compressed_buf_size = 0;
uint32_t time_uncompressed_buf_size = 0;
- char *time_buf = nullptr;
- uint32_t time_buf_size = 0;
+
// Step 2: do uncompress
if (IS_SUCC(ret)) {
@@ -398,26 +397,12 @@ int AlignedChunkReader::decode_cur_time_page_data() {
}
}
- // Step 3: get time_buf
- if (IS_SUCC(ret)) {
- int var_size = 0;
- if (RET_FAIL(SerializationUtil::read_var_uint(
- time_buf_size, time_uncompressed_buf,
- time_uncompressed_buf_size, &var_size))) {
- } else {
- time_buf = time_uncompressed_buf + var_size;
- if (time_uncompressed_buf_size < var_size + time_buf_size) {
- ret = E_TSFILE_CORRUPTED;
- ASSERT(false);
- }
- }
- }
time_decoder_->reset();
#ifdef DEBUG_SE
DEBUG_hex_dump_buf("AlignedChunkReader reader, time_buf = ", time_buf,
time_buf_size);
#endif
- time_in_.wrap_from(time_buf, time_buf_size);
+ time_in_.wrap_from(time_uncompressed_buf_, time_uncompressed_buf_size);
return ret;
}
diff --git a/cpp/src/reader/aligned_chunk_reader.h
b/cpp/src/reader/aligned_chunk_reader.h
index 0f10184a..58898f7d 100644
--- a/cpp/src/reader/aligned_chunk_reader.h
+++ b/cpp/src/reader/aligned_chunk_reader.h
@@ -58,7 +58,7 @@ class AlignedChunkReader : public IChunkReader {
value_uncompressed_buf_(nullptr),
cur_value_index(-1) {}
int init(ReadFile *read_file, common::String m_name,
- common::TSDataType data_type, Filter *time_filter)
override;
+ common::TSDataType data_type, Filter *time_filter) override;
void reset() override;
void destroy() override;
~AlignedChunkReader() override = default;
@@ -71,15 +71,16 @@ class AlignedChunkReader : public IChunkReader {
}
ChunkHeader &get_chunk_header() override { return value_chunk_header_; }
int load_by_aligned_meta(ChunkMeta *time_meta,
- ChunkMeta *value_meta) override;
+ ChunkMeta *value_meta) override;
int get_next_page(common::TsBlock *tsblock, Filter *oneshoot_filter,
- common::PageArena &pa) override;
+ common::PageArena &pa) override;
private:
FORCE_INLINE bool chunk_has_only_one_page(
const ChunkHeader &chunk_header) const {
- return chunk_header.chunk_type_ == ONLY_ONE_PAGE_CHUNK_HEADER_MARKER;
+ return (chunk_header.chunk_type_ & ONLY_ONE_PAGE_CHUNK_HEADER_MARKER)
==
+ ONLY_ONE_PAGE_CHUNK_HEADER_MARKER;
}
int alloc_compressor_and_decoder(storage::Decoder *&decoder,
storage::Compressor *&compressor,
@@ -95,8 +96,7 @@ class AlignedChunkReader : public IChunkReader {
ChunkMeta *&chunk_meta,
uint32_t &chunk_visit_offset,
int32_t &file_data_buf_size,
- int want_size = 0,
- bool may_shrink = true);
+ int want_size = 0, bool may_shrink = true);
bool cur_page_statisify_filter(Filter *filter);
int skip_cur_page();
int decode_cur_time_page_data();
diff --git a/cpp/src/reader/bloom_filter.cc b/cpp/src/reader/bloom_filter.cc
index 1ff1109d..58348094 100644
--- a/cpp/src/reader/bloom_filter.cc
+++ b/cpp/src/reader/bloom_filter.cc
@@ -200,7 +200,7 @@ String BloomFilter::get_entry_string(const String
&device_name,
int BloomFilter::add_path_entry(const String &device_name,
const String &measurement_name) {
- if (device_name.is_null() || measurement_name.is_null()) {
+ if (device_name.is_null()) {
return E_INVALID_ARG;
}
diff --git a/cpp/src/reader/chunk_reader.h b/cpp/src/reader/chunk_reader.h
index 0d5b82d8..80a34164 100644
--- a/cpp/src/reader/chunk_reader.h
+++ b/cpp/src/reader/chunk_reader.h
@@ -72,7 +72,8 @@ class ChunkReader : public IChunkReader {
private:
FORCE_INLINE bool chunk_has_only_one_page() const {
- return chunk_header_.chunk_type_ == ONLY_ONE_PAGE_CHUNK_HEADER_MARKER;
+ return (chunk_header_.chunk_type_ & ONLY_ONE_PAGE_CHUNK_HEADER_MARKER)
==
+ ONLY_ONE_PAGE_CHUNK_HEADER_MARKER;
}
int alloc_compressor_and_value_decoder(
common::TSEncoding encoding, common::TSDataType data_type,
diff --git a/cpp/src/reader/table_query_executor.cc
b/cpp/src/reader/table_query_executor.cc
index df52105f..c46afdf4 100644
--- a/cpp/src/reader/table_query_executor.cc
+++ b/cpp/src/reader/table_query_executor.cc
@@ -29,11 +29,9 @@ int TableQueryExecutor::query(const std::string &table_name,
file_metadata = tsfile_io_reader_->get_tsfile_meta();
common::PageArena pa;
pa.init(512, common::MOD_TSFILE_READER);
- common::String table_name_str;
- table_name_str.dup_from(table_name, pa);
MetaIndexNode *table_root = nullptr;
std::shared_ptr<TableSchema> table_schema;
- if (RET_FAIL(file_metadata->get_table_metaindex_node(table_name_str,
+ if (RET_FAIL(file_metadata->get_table_metaindex_node(table_name,
table_root))) {
} else if (RET_FAIL(
file_metadata->get_table_schema(table_name, table_schema)))
{
diff --git a/cpp/src/reader/table_result_set.cc
b/cpp/src/reader/table_result_set.cc
index 6fb8a58a..23e35aaa 100644
--- a/cpp/src/reader/table_result_set.cc
+++ b/cpp/src/reader/table_result_set.cc
@@ -18,6 +18,8 @@
*/
#include "reader/table_result_set.h"
+#include <utils/storage_utils.h>
+
namespace storage {
void TableResultSet::init() {
row_record_ = new RowRecord(column_names_.size() + 1);
@@ -62,7 +64,7 @@ int TableResultSet::next(bool& has_next) {
if (row_iterator_ == nullptr || !row_iterator_->has_next()) {
has_next = false;
}
-
+
if (has_next && IS_SUCC(ret)) {
uint32_t len = 0;
bool null = false;
@@ -78,7 +80,7 @@ int TableResultSet::next(bool& has_next) {
}
bool TableResultSet::is_null(const std::string& column_name) {
- auto iter = index_lookup_.find(column_name);
+ auto iter = index_lookup_.find(to_lower(column_name));
if (iter == index_lookup_.end()) {
return true;
} else {
@@ -88,7 +90,8 @@ bool TableResultSet::is_null(const std::string& column_name) {
bool TableResultSet::is_null(uint32_t column_index) {
ASSERT(1 <= column_index && column_index <= row_record_->get_col_num());
- return row_record_->get_field(column_index - 1) == nullptr;
+ return row_record_->get_field(column_index - 1) == nullptr ||
+ row_record_->get_field(column_index -
1)->is_type(common::NULL_TYPE);
}
RowRecord* TableResultSet::get_row_record() { return row_record_; }
diff --git a/cpp/src/reader/tsfile_reader.cc b/cpp/src/reader/tsfile_reader.cc
index faad6448..162fb6ad 100644
--- a/cpp/src/reader/tsfile_reader.cc
+++ b/cpp/src/reader/tsfile_reader.cc
@@ -93,15 +93,19 @@ int TsFileReader::query(const std::string &table_name,
return E_TSFILE_WRITER_META_ERR;
}
std::shared_ptr<TableSchema> table_schema =
- tsfile_meta->table_schemas_.at(table_name);
+ tsfile_meta->table_schemas_.at(to_lower(table_name));
if (table_schema == nullptr) {
return E_TABLE_NOT_EXIST;
}
+ std::vector<std::string> columns_names_lowercase(columns_names);
+ for (auto &column_name : columns_names_lowercase) {
+ to_lowercase_inplace(column_name);
+ }
std::vector<TSDataType> data_types = table_schema->get_data_types();
Filter* time_filter = new TimeBetween(start_time, end_time, false);
- ret = table_query_executor_->query(table_name, columns_names, time_filter,
nullptr, nullptr, result_set);
+ ret = table_query_executor_->query(to_lower(table_name),
columns_names_lowercase, time_filter, nullptr, nullptr, result_set);
return ret;
}
@@ -116,6 +120,7 @@ std::vector<std::shared_ptr<IDeviceID>>
TsFileReader::get_all_devices(
if (tsfile_meta != nullptr) {
PageArena pa;
pa.init(512, MOD_TSFILE_READER);
+ to_lowercase_inplace(table_name);
auto index_node =
tsfile_meta->table_metadata_index_node_map_[table_name];
get_all_devices(device_ids, index_node, pa);
@@ -198,13 +203,12 @@ ResultSet* TsFileReader::read_timeseries(
std::shared_ptr<TableSchema> TsFileReader::get_table_schema(const std::string
&table_name) {
TsFileMeta *file_metadata = tsfile_executor_->get_tsfile_meta();
- common::String table_name_str(table_name);
MetaIndexNode *table_root = nullptr;
std::shared_ptr<TableSchema> table_schema;
- if (IS_FAIL(file_metadata->get_table_metaindex_node(table_name_str,
+ if (IS_FAIL(file_metadata->get_table_metaindex_node(to_lower(table_name),
table_root))) {
} else if (IS_FAIL(
- file_metadata->get_table_schema(table_name, table_schema)))
{
+ file_metadata->get_table_schema(to_lower(table_name),
table_schema))) {
}
return table_schema;
}
diff --git a/cpp/src/utils/errno_define.h b/cpp/src/utils/errno_define.h
index 1112e90f..8d87ade9 100644
--- a/cpp/src/utils/errno_define.h
+++ b/cpp/src/utils/errno_define.h
@@ -28,8 +28,7 @@ const int E_ALREADY_EXIST = 3;
const int E_INVALID_ARG = 4;
const int E_OUT_OF_RANGE = 5;
const int E_PARTIAL_READ = 6;
-const int E_NET_BIND_ERR = 7;
-const int E_NET_SOCKET_ERR = 8;
+const int E_INVALID_SCHEMA = 8;
const int E_NET_EPOLL_ERR = 9;
const int E_NET_EPOLL_WAIT_ERR = 10;
const int E_NET_RECV_ERR = 11;
diff --git a/cpp/src/utils/storage_utils.h b/cpp/src/utils/storage_utils.h
index 53e45400..f4d596b0 100644
--- a/cpp/src/utils/storage_utils.h
+++ b/cpp/src/utils/storage_utils.h
@@ -21,6 +21,7 @@
#include <inttypes.h>
#include <stdint.h>
+#include <algorithm>
#include "common/datatype/value.h"
#include "common/tsblock/tsblock.h"
@@ -76,6 +77,19 @@ FORCE_INLINE std::string get_file_path_from_file_id(
return oss.str();
}
+static void to_lowercase_inplace(std::string &str) {
+ std::transform(
+ str.begin(), str.end(), str.begin(),
+ [](unsigned char c) -> unsigned char { return std::tolower(c); });
+}
+static std::string to_lower(const std::string &str) {
+ std::string result;
+ std::transform(
+ str.begin(), str.end(), std::back_inserter(result),
+ [](unsigned char c) -> unsigned char { return std::tolower(c); });
+ return result;
+}
+
} // end namespace storage
#endif // UTILS_STORAGE_UTILS_H
diff --git a/cpp/src/writer/time_page_writer.cc
b/cpp/src/writer/time_page_writer.cc
index 2ac75315..55450a1d 100644
--- a/cpp/src/writer/time_page_writer.cc
+++ b/cpp/src/writer/time_page_writer.cc
@@ -31,22 +31,20 @@ namespace storage {
int TimePageData::init(ByteStream &time_bs, Compressor *compressor) {
int ret = E_OK;
time_buf_size_ = time_bs.total_size();
- uint32_t var_size = get_var_uint_size(time_buf_size_);
- uncompressed_size_ = var_size + time_buf_size_;
+ uncompressed_size_ = time_buf_size_;
uncompressed_buf_ =
(char *)mem_alloc(uncompressed_size_, MOD_PAGE_WRITER_OUTPUT_STREAM);
compressor_ = compressor;
if (IS_NULL(uncompressed_buf_)) {
return E_OOM;
}
+
if (time_buf_size_ == 0) {
return E_INVALID_ARG;
}
- if (RET_FAIL(SerializationUtil::write_var_uint(
- time_buf_size_, uncompressed_buf_, var_size))) {
- } else if (RET_FAIL(
- common::copy_bs_to_buf(time_bs, uncompressed_buf_ +
var_size,
- uncompressed_size_ - var_size))) {
+ // TODO: Maybe use time_bs as compressed_buf.
+ if (RET_FAIL(common::copy_bs_to_buf(time_bs, uncompressed_buf_,
+ uncompressed_size_))) {
} else {
// TODO
// NOTE: different compressor may have different compress API
diff --git a/cpp/src/writer/tsfile_table_writer.cc
b/cpp/src/writer/tsfile_table_writer.cc
index 73af7727..6cbb0c9b 100644
--- a/cpp/src/writer/tsfile_table_writer.cc
+++ b/cpp/src/writer/tsfile_table_writer.cc
@@ -34,6 +34,18 @@ int storage::TsFileTableWriter::write_table(storage::Tablet&
tablet) const {
} else if (!exclusive_table_name_.empty() && tablet.get_table_name() !=
exclusive_table_name_) {
return common::E_TABLE_NOT_EXIST;
}
+ tablet.set_table_name(to_lower(tablet.get_table_name()));
+ for (int i = 0; i < tablet.get_column_count(); i++) {
+ tablet.set_column_name(i, to_lower(tablet.get_column_name(i)));
+ }
+
+ auto schema_map = tablet.get_schema_map();
+ std::map<std::string, int> schema_map_;
+ for (auto iter = schema_map.begin(); iter != schema_map.end(); iter++) {
+ schema_map_[to_lower(iter->first)] = iter->second;
+ }
+ tablet.set_schema_map(schema_map_);
+
return tsfile_writer_->write_table(tablet);
}
diff --git a/cpp/src/writer/value_page_writer.h
b/cpp/src/writer/value_page_writer.h
index 649c00c3..9cf44aec 100644
--- a/cpp/src/writer/value_page_writer.h
+++ b/cpp/src/writer/value_page_writer.h
@@ -70,10 +70,10 @@ struct ValuePageData {
ret = common::E_TYPE_NOT_MATCH; \
return ret; \
} \
+ if ((size_ / 8) + 1 > col_notnull_bitmap_.size()) { \
+ col_notnull_bitmap_.push_back(0); \
+ } \
if (!ISNULL) { \
- if ((size_ / 8) + 1 > col_notnull_bitmap_.size()) { \
- col_notnull_bitmap_.push_back(0); \
- } \
col_notnull_bitmap_[size_ / 8] |= (MASK >> (size_ % 8)); \
} \
size_++; \
diff --git a/cpp/test/cwrapper/c_release_test.cc
b/cpp/test/cwrapper/c_release_test.cc
new file mode 100644
index 00000000..ef50f00b
--- /dev/null
+++ b/cpp/test/cwrapper/c_release_test.cc
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License a
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <gtest/gtest.h>
+#include <unistd.h>
+#include <utils/db_utils.h>
+extern "C" {
+#include "cwrapper/errno_define_c.h"
+#include "cwrapper/tsfile_cwrapper.h"
+}
+
+#include "common/tablet.h"
+#include "utils/errno_define.h"
+namespace CReleaseTest {
+class CReleaseTest : public testing::Test {};
+
+TEST_F(CReleaseTest, TestCreateFile) {
+ ERRNO error_no = RET_OK;
+ // Create File and Get RET_OK
+ WriteFile file = write_file_new("create_file1.tsfile", &error_no);
+ ASSERT_EQ(RET_OK, error_no);
+ free_write_file(&file);
+
+ // Already exists
+ file = write_file_new("create_file1.tsfile", &error_no);
+ ASSERT_EQ(RET_ALREADY_EXIST, error_no);
+ ASSERT_EQ(nullptr, file);
+
+ // Folder
+ file = write_file_new("test/", &error_no);
+ ASSERT_EQ(RET_FILRET_OPEN_ERR, error_no);
+
+ remove("create_file1.tsfile");
+ free_write_file(&file);
+}
+
+TEST_F(CReleaseTest, TsFileWriterNew) {
+ ERRNO error_code = RET_OK;
+
+ TableSchema test_schema;
+ test_schema.table_name = strdup("test_table");
+ test_schema.column_num = 0;
+
+ // Invalid schema
+ WriteFile file = write_file_new("test_empty_schema.tsfile", &error_code);
+ ASSERT_EQ(RET_OK, error_code);
+ TsFileWriter writer = tsfile_writer_new(file, &test_schema, &error_code);
+ ASSERT_EQ(RET_INVALID_SCHEMA, error_code);
+ ASSERT_EQ(nullptr, writer);
+ ASSERT_EQ(RET_OK, tsfile_writer_close(writer));
+ free_write_file(&file);
+ ASSERT_EQ(nullptr, file);
+ remove("test_empty_schema.tsfile");
+
+ // Invalid schema with memory threshold
+ file = write_file_new("test_empty_schema_memory_threshold.tsfile",
&error_code);
+ ASSERT_EQ(RET_OK, error_code);
+ // Invalid schema
+ writer = tsfile_writer_new_with_memory_threshold(file, &test_schema, 100,
+ &error_code);
+ ASSERT_EQ(RET_INVALID_SCHEMA, error_code);
+ ASSERT_EQ(nullptr, writer);
+ ASSERT_EQ(RET_OK, tsfile_writer_close(writer));
+ free_write_file(&file);
+ ASSERT_EQ(nullptr, file);
+ remove("test_empty_schema_memory_threshold.tsfile");
+
+ // Normal schema
+ file = write_file_new("test_empty_writer.tsfile", &error_code);
+ ASSERT_EQ(RET_OK, error_code);
+
+ TableSchema table_schema;
+ table_schema.table_name = strdup("test_table");
+ table_schema.column_num = 2;
+ table_schema.column_schemas =
+ static_cast<ColumnSchema *>(malloc(sizeof(ColumnSchema) * 2));
+ table_schema.column_schemas[0] =
+ (ColumnSchema){.column_name = strdup("col1"),
+ .data_type = TS_DATATYPE_STRING,
+ .column_category = TAG};
+ table_schema.column_schemas[1] =
+ (ColumnSchema){.column_name = strdup("col2"),
+ .data_type = TS_DATATYPE_INT32,
+ .column_category = FIELD};
+
+ writer = tsfile_writer_new(file, &table_schema, &error_code);
+ ASSERT_EQ(RET_OK, error_code);
+ error_code = tsfile_writer_close(writer);
+ ASSERT_EQ(RET_OK, error_code);
+ free_write_file(&file);
+ remove("test_empty_writer.tsfile");
+
+ free_table_schema(table_schema);
+ free_table_schema(test_schema);
+
+}
+
+TEST_F(CReleaseTest, TsFileWriterWriteDataAbnormalColumn) {
+ ERRNO error_code = RET_OK;
+ WriteFile file = write_file_new(
+ "TsFileWriterWriteDataAbnormalColumn_3_100.tsfile", &error_code);
+
+ TableSchema abnormal_schema;
+ abnormal_schema.table_name = strdup("!@#$%^*()_+-=");
+ abnormal_schema.column_num = 3;
+ abnormal_schema.column_schemas =
+ static_cast<ColumnSchema *>(malloc(sizeof(ColumnSchema) * 4));
+ abnormal_schema.column_schemas[0] =
+ (ColumnSchema){.column_name = strdup("!@#$%^*()_+-="),
+ .data_type = TS_DATATYPE_STRING,
+ .column_category = TAG};
+
+ // TAG's datatype is not correct
+ abnormal_schema.column_schemas[1] =
+ (ColumnSchema){.column_name = strdup("TAG2"),
+ .data_type = TS_DATATYPE_INT32,
+ .column_category = TAG};
+
+ // same column name with column[0]
+ abnormal_schema.column_schemas[2] =
+ (ColumnSchema){.column_name = strdup("!@#$%^*()_+-="),
+ .data_type = TS_DATATYPE_DOUBLE,
+ .column_category = FIELD};
+
+ // column name conflict
+ TsFileWriter writer =
+ tsfile_writer_new(file, &abnormal_schema, &error_code);
+ ASSERT_EQ(RET_INVALID_SCHEMA, error_code);
+ free(abnormal_schema.column_schemas[2].column_name);
+
+ abnormal_schema.column_schemas[2] =
+ (ColumnSchema){.column_name = strdup("!@#$%^*()_+-=1"),
+ .data_type = TS_DATATYPE_DOUBLE,
+ .column_category = FIELD};
+
+ // datatype conflict
+ writer = tsfile_writer_new(file, &abnormal_schema, &error_code);
+ ASSERT_EQ(RET_INVALID_SCHEMA, error_code);
+
+ free(abnormal_schema.column_schemas[1].column_name);
+ abnormal_schema.column_schemas[1] =
+ (ColumnSchema){.column_name = strdup("TAG2"),
+ .data_type = TS_DATATYPE_STRING,
+ .column_category = TAG};
+
+ writer = tsfile_writer_new(file, &abnormal_schema, &error_code);
+ ASSERT_EQ(RET_OK, error_code);
+
+ char **column_list = static_cast<char **>(malloc(sizeof(char *) * 3));
+ column_list[0] = strdup("!@#$%^*()_+-=");
+ column_list[1] = strdup("TAG2");
+ column_list[2] = strdup("!@#$%^*()_+-=1");
+ TSDataType *type_list =
+ static_cast<TSDataType *>(malloc(sizeof(TSDataType) * 3));
+ type_list[0] = TS_DATATYPE_STRING;
+ type_list[1] = TS_DATATYPE_STRING;
+ type_list[2] = TS_DATATYPE_DOUBLE;
+ Tablet tablet = tablet_new(column_list, type_list, 3, 100);
+ for (int i = 0; i < 100; i++) {
+ tablet_add_timestamp(tablet, i, static_cast<int64_t>(i));
+ tablet_add_value_by_name_string(tablet, i, "!@#$%^*()_+-=", "device1");
+ tablet_add_value_by_index_string(
+ tablet, i, 1, std::string("sensor" + std::to_string(i)).c_str());
+ tablet_add_value_by_name_double(tablet, i, "!@#$%^*()_+-=1", i *
100.0);
+ }
+ ASSERT_EQ(RET_OK, tsfile_writer_write(writer, tablet));
+ ASSERT_EQ(RET_OK, tsfile_writer_close(writer));
+ free_write_file(&file);
+
+ TsFileReader reader = tsfile_reader_new(
+ "TsFileWriterWriteDataAbnormalColumn_3_100.tsfile", &error_code);
+ ASSERT_EQ(RET_OK, error_code);
+ int i = 0;
+ ResultSet result_set = tsfile_query_table(
+ reader, "!@#$%^*()_+-=", column_list, 3, 0, 100, &error_code);
+ while (tsfile_result_set_next(result_set, &error_code) &&
+ error_code == RET_OK) {
+ Timestamp timestamp =
+ tsfile_result_set_get_value_by_name_int64_t(result_set, "time");
+ ASSERT_EQ(timestamp * 100.0,
tsfile_result_set_get_value_by_name_double(
+ result_set, "!@#$%^*()_+-=1"));
+ char *value_str =
+ tsfile_result_set_get_value_by_index_string(result_set, 2);
+ ASSERT_EQ("device1", std::string(value_str));
+ free(value_str);
+ i++;
+ }
+ ASSERT_EQ(100, i);
+ for (int i = 0; i < 3; i++) {
+ free(column_list[i]);
+ }
+ free(column_list);
+ free(type_list);
+ free_write_file(&file);
+ free_table_schema(abnormal_schema);
+ free_tablet(&tablet);
+ free_tsfile_result_set(&result_set);
+ tsfile_reader_close(reader);
+ remove("TsFileWriterWriteDataAbnormalColumn_3_100.tsfile");
+}
+
+TEST_F(CReleaseTest, TsFileWriterMultiDataType) {
+ ERRNO error_code = RET_OK;
+ WriteFile file = write_file_new(
+ "TsFileWriterMultiDataType.tsfile", &error_code);
+ ASSERT_EQ(RET_OK, error_code);
+ TableSchema all_type_schema;
+ all_type_schema.table_name = strdup("All_Datatype");
+ all_type_schema.column_num = 6;
+ all_type_schema.column_schemas =
+ static_cast<ColumnSchema *>(malloc(sizeof(ColumnSchema) * 6));
+ all_type_schema.column_schemas[0] =
+ (ColumnSchema){.column_name = strdup("TAG"),
+ .data_type = TS_DATATYPE_STRING,
+ .column_category = TAG};
+ all_type_schema.column_schemas[1] =
+ (ColumnSchema){.column_name = strdup("INT32"),
+ .data_type = TS_DATATYPE_INT32,
+ .column_category = FIELD};
+ all_type_schema.column_schemas[2] =
+ (ColumnSchema){.column_name = strdup("INT64"),
+ .data_type = TS_DATATYPE_INT64,
+ .column_category = FIELD};
+ all_type_schema.column_schemas[3] =
+ (ColumnSchema){.column_name = strdup("FLOAT"),
+ .data_type = TS_DATATYPE_FLOAT,
+ .column_category = FIELD};
+ all_type_schema.column_schemas[4] =
+ (ColumnSchema){.column_name = strdup("DOUBLE"),
+ .data_type = TS_DATATYPE_DOUBLE,
+ .column_category = FIELD};
+ all_type_schema.column_schemas[5] =
+ (ColumnSchema){.column_name = strdup("BOOLEAN"),
+ .data_type = TS_DATATYPE_BOOLEAN,
+ .column_category = FIELD};
+
+ TsFileWriter writer =
+ tsfile_writer_new(file, &all_type_schema, &error_code);
+ ASSERT_EQ(RET_OK, error_code);
+
+ free_table_schema(all_type_schema);
+ char **column_list = static_cast<char **>(malloc(sizeof(char *) * 6));
+ column_list[0] = strdup("TAG");
+ column_list[1] = strdup("INT32");
+ column_list[2] = strdup("INT64");
+ column_list[3] = strdup("FLOAT");
+ column_list[4] = strdup("DOUBLE");
+ column_list[5] = strdup("BOOLEAN");
+ TSDataType *type_list =
+ static_cast<TSDataType *>(malloc(sizeof(TSDataType) * 6));
+ type_list[0] = TS_DATATYPE_STRING;
+ type_list[1] = TS_DATATYPE_INT32;
+ type_list[2] = TS_DATATYPE_INT64;
+ type_list[3] = TS_DATATYPE_FLOAT;
+ type_list[4] = TS_DATATYPE_DOUBLE;
+ type_list[5] = TS_DATATYPE_BOOLEAN;
+ Tablet tablet = tablet_new(column_list, type_list, 6, 1000);
+ for (int i = 0; i < 1000; i++) {
+ // negative timestamp included
+ tablet_add_timestamp(tablet, i, static_cast<int64_t>(i - 10));
+ tablet_add_value_by_name_string(tablet, i, "TAG", "device1");
+ tablet_add_value_by_name_int32_t(tablet, i, "INT32", i);
+ tablet_add_value_by_index_int64_t(tablet, i, 2, i * 100);
+ tablet_add_value_by_index_float(tablet, i, 3, i * 100.0);
+ if (i > 900) {
+ continue;
+ }
+ // Null value
+ tablet_add_value_by_index_double(tablet, i, 4, i * 100.0);
+ tablet_add_value_by_index_bool(tablet, i, 5, i % 2 == 0);
+ }
+ ASSERT_EQ(RET_OK, tsfile_writer_write(writer, tablet));
+ ASSERT_EQ(RET_OK, tsfile_writer_close(writer));
+ free_write_file(&file);
+
+ TsFileReader reader = tsfile_reader_new(
+ "TsFileWriterMultiDataType.tsfile", &error_code);
+ ASSERT_EQ(RET_OK, error_code);
+ ResultSet result_set = tsfile_query_table(
+ reader, "all_datatype", column_list, 6, 0, 1000, &error_code);
+ int row_num = 0;
+ while (tsfile_result_set_next(result_set, &error_code) &&
+ error_code == RET_OK) {
+ Timestamp timestamp =
+ tsfile_result_set_get_value_by_name_int64_t(result_set, "time");
+ int64_t value = timestamp + 10;
+ char *str_value =
+ tsfile_result_set_get_value_by_name_string(result_set, "TAG");
+ ASSERT_EQ("device1", std::string(str_value));
+ free(str_value);
+ ASSERT_EQ(value,
tsfile_result_set_get_value_by_name_int32_t(result_set,
+ "int32"));
+ ASSERT_EQ(value * 100, tsfile_result_set_get_value_by_name_int64_t(
+ result_set, "int64"));
+ ASSERT_EQ(value * 100.0, tsfile_result_set_get_value_by_name_float(
+ result_set, "FLOAT"));
+
+ if (value <= 900) {
+ ASSERT_EQ(value * 100.0,
tsfile_result_set_get_value_by_name_double(
+ result_set, "DOUBLE"));
+ ASSERT_EQ(value % 2 == 0, tsfile_result_set_get_value_by_name_bool(
+ result_set, "BOOLEAN"));
+ } else {
+ ASSERT_TRUE(
+ tsfile_result_set_is_null_by_name(result_set, "DOUBLE"));
+ }
+ row_num++;
+ }
+ ASSERT_EQ(1000, row_num);
+ free_tsfile_result_set(&result_set);
+ tsfile_reader_close(reader);
+ for (int i = 0; i < 6; i++) {
+ free(column_list[i]);
+ }
+ free_tablet(&tablet);
+ free(column_list);
+ free(type_list);
+ remove("TsFileWriterMultiDataType.tsfile");
+}
+
+} // namespace CReleaseTest
\ No newline at end of file
diff --git a/cpp/test/cwrapper/cwrapper_test.cc
b/cpp/test/cwrapper/cwrapper_test.cc
index 0cd6b325..60eeabda 100644
--- a/cpp/test/cwrapper/cwrapper_test.cc
+++ b/cpp/test/cwrapper/cwrapper_test.cc
@@ -60,14 +60,12 @@ TEST_F(CWrapperTest, WriterFlushTabletAndReadData) {
for (int i = 0; i < id_schema_num; i++) {
schema.column_schemas[i] =
ColumnSchema{strdup(std::string("id" + std::to_string(i)).c_str()),
- TS_DATATYPE_STRING, TS_COMPRESSION_UNCOMPRESSED,
- TS_ENCODING_PLAIN, TAG};
+ TS_DATATYPE_STRING, TAG};
}
for (int i = 0; i < field_schema_num; i++) {
schema.column_schemas[i + id_schema_num] =
ColumnSchema{strdup(std::string("s" + std::to_string(i)).c_str()),
- TS_DATATYPE_INT64, TS_COMPRESSION_UNCOMPRESSED,
- TS_ENCODING_PLAIN, FIELD};
+ TS_DATATYPE_INT64, FIELD};
}
WriteFile file =
write_file_new("cwrapper_write_flush_and_read.tsfile", &code);