jt2594838 commented on code in PR #304:
URL: https://github.com/apache/tsfile/pull/304#discussion_r1865197048
##########
cpp/src/common/tablet.h:
##########
@@ -20,64 +20,85 @@
#ifndef COMMON_TABLET_H
#define COMMON_TABLET_H
+#include <algorithm>
+#include <memory>
#include <vector>
#include "common/container/bit_map.h"
#include "schema.h"
namespace storage {
+template <typename T>
+
class TabletRowIterator;
class TabletColIterator;
class Tablet {
public:
- static const int DEFAULT_MAX_ROWS = 1024;
+ static const uint32_t DEFAULT_MAX_ROWS = 1024;
public:
- Tablet(const std::string &device_name,
- const std::vector<MeasurementSchema> *schema_vec,
- int max_rows = DEFAULT_MAX_ROWS)
- : max_rows_(max_rows),
- device_name_(device_name),
+ Tablet(const std::string &device_id,
+ std::shared_ptr<std::vector<MeasurementSchema>> schema_vec,
+ uint32_t max_rows = DEFAULT_MAX_ROWS)
+ : max_row_num_(max_rows),
+ device_id_(device_id),
schema_vec_(schema_vec),
timestamps_(NULL),
value_matrix_(NULL),
bitmaps_(NULL) {
- ASSERT(device_name.size() >= 1);
+ ASSERT(device_id.size() >= 1);
ASSERT(schema_vec != NULL);
ASSERT(max_rows > 0 && max_rows < (1 << 30));
if (max_rows < 0) {
ASSERT(false);
- max_rows_ = DEFAULT_MAX_ROWS;
+ max_row_num_ = DEFAULT_MAX_ROWS;
}
Review Comment:
Should not change max_rows to uint.
1. the assertions below will be meaningless;
2. other languages, like Java, do not support array size larger than the max
value of signed i32.
##########
cpp/src/reader/tsfile_reader.cc:
##########
@@ -37,29 +54,74 @@ TsFileReader::~TsFileReader() {
delete read_file_;
read_file_ = nullptr;
}
+ return ret; // TO DO
Review Comment:
TODO what
##########
cpp/src/reader/tsfile_reader.cc:
##########
@@ -37,29 +54,74 @@ TsFileReader::~TsFileReader() {
delete read_file_;
read_file_ = nullptr;
}
+ return ret; // TO DO
}
-int TsFileReader::open(const std::string &file_path) {
+int TsFileReader::query(QueryExpression *qe, ResultSet *&ret_qds) {
+ return tsfile_executor_->execute(qe, ret_qds);
+}
+
+int TsFileReader::query(std::vector<std::string> &path_list, int64_t
start_time,
+ int64_t end_time, ResultSet *&result_set) {
int ret = E_OK;
- read_file_ = new storage::ReadFile;
- tsfile_executor_ = new storage::TsFileExecutor();
- if (RET_FAIL(read_file_->open(file_path))) {
- std::cout << "filed to open file " << ret << std::endl;
- } else if (RET_FAIL(tsfile_executor_->init(read_file_))) {
- std::cout << "filed to init " << ret << std::endl;
+ Filter *time_filter = new TimeBetween(start_time, end_time, false);
+ Expression *exp =
+ new storage::Expression(storage::GLOBALTIME_EXPR, time_filter);
+ std::vector<Path> path_list_vec;
+ for (const auto &path : path_list) {
+ uint32_t last_point_pos = path.find_last_of('.');
+ if (last_point_pos <= 0) {
+ return E_INVALID_PATH;
+ }
+ std::string device_name = path.substr(0, last_point_pos);
+ std::string measurement_name =
+ path.substr(last_point_pos + 1, path.size() - last_point_pos);
+ path_list_vec.emplace_back(Path(device_name, measurement_name));
}
+ QueryExpression *query_expression =
+ QueryExpression::create(path_list_vec, exp);
+ ret = tsfile_executor_->execute(query_expression, result_set);
return ret;
}
-int TsFileReader::query(QueryExpression *qe, QueryDataSet *&ret_qds) {
- return tsfile_executor_->execute(qe, ret_qds);
+void TsFileReader::destroy_query_data_set(storage::ResultSet *qds) {
+ tsfile_executor_->destroy_query_data_set(qds);
}
-void TsFileReader::destroy_query_data_set(storage::QueryDataSet *qds) {
- tsfile_executor_->destroy_query_data_set(qds);
+std::vector<std::string> TsFileReader::get_all_devices() {
+ TsFileMeta *tsfile_meta = tsfile_executor_->get_tsfile_meta();
+ std::vector<std::string> device_ids;
+ if (tsfile_meta != nullptr) {
+ device_ids.reserve(tsfile_meta->index_node_->children_.size());
+ for (const auto &meta_index_entry :
+ tsfile_meta->index_node_->children_) {
+ device_ids.push_back(meta_index_entry->name_.to_std_string());
+ }
+ }
+ return device_ids;
Review Comment:
`tsfile_meta->index_node_->children` may only be internal nodes, whose
entries do not represent all devices.
##########
cpp/src/file/tsfile_io_reader.cc:
##########
@@ -457,6 +517,81 @@ int TsFileIOReader::do_load_timeseries_index(
return ret;
}
+int TsFileIOReader::do_load_all_timeseries_index(
+ std::vector<std::pair<MetaIndexEntry *, int64_t>> &index_node_entry_list,
+ common::PageArena &in_timeseries_index_pa,
+ std::vector<ITimeseriesIndex *> &ts_indexs) {
+ int ret = E_OK;
+ for (const auto &index_node_entry : index_node_entry_list) {
+ int64_t start_offset = index_node_entry.first->offset_,
+ end_offset = index_node_entry.second;
+ std::cout << index_node_entry.first->name_ << std::endl;
+ const std::string target_measurement_name(
+ index_node_entry.first->name_.to_std_string());
+ ITimeseriesIndex *ts_idx;
+ ret = do_load_timeseries_index(target_measurement_name, start_offset,
+ end_offset, in_timeseries_index_pa,
+ ts_idx);
+ if (IS_SUCC(ret)) {
+ ts_indexs.push_back(ts_idx);
+ }
+ }
+ return ret;
+}
+
+int TsFileIOReader::get_all_leaf(
+ MetaIndexNode *index_node,
+ std::vector<std::pair<MetaIndexEntry *, int64_t>> &index_node_entry_list) {
+ int ret = E_OK;
+ if (index_node->node_type_ == LEAF_MEASUREMENT ||
+ index_node->node_type_ == LEAF_DEVICE) {
+ for (size_t i = 0; i < index_node->children_.size(); i++) {
+ if (i + 1 < index_node->children_.size()) {
+ index_node_entry_list.push_back(
+ std::make_pair(index_node->children_[i],
+ index_node->children_[i + 1]->offset_));
+ } else {
+ index_node_entry_list.push_back(std::make_pair(
+ index_node->children_[i], index_node->end_offset_));
+ }
+ }
+ } else {
+ // reader next level index node
Review Comment:
reader -> read
##########
cpp/src/file/tsfile_io_reader.cc:
##########
@@ -457,6 +517,81 @@ int TsFileIOReader::do_load_timeseries_index(
return ret;
}
+int TsFileIOReader::do_load_all_timeseries_index(
+ std::vector<std::pair<MetaIndexEntry *, int64_t>> &index_node_entry_list,
+ common::PageArena &in_timeseries_index_pa,
+ std::vector<ITimeseriesIndex *> &ts_indexs) {
+ int ret = E_OK;
+ for (const auto &index_node_entry : index_node_entry_list) {
+ int64_t start_offset = index_node_entry.first->offset_,
+ end_offset = index_node_entry.second;
+ std::cout << index_node_entry.first->name_ << std::endl;
Review Comment:
debug
##########
cpp/test/reader/tsfile_reader_test.cc:
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License a
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "reader/tsfile_reader.h"
+
+#include <gtest/gtest.h>
+
+#include <random>
+#include <vector>
+
+#include "common/path.h"
+#include "common/record.h"
+#include "common/schema.h"
+#include "common/tablet.h"
+#include "file/tsfile_io_writer.h"
+#include "file/write_file.h"
+#include "reader/qds_without_timegenerator.h"
+#include "writer/tsfile_writer.h"
+
+using namespace storage;
+using namespace common;
+
+class TsFileReaderTest : public ::testing::Test {
+ protected:
+ void SetUp() override {
+ tsfile_writer_ = new TsFileWriter();
+ libtsfile_init();
+ file_name_ = std::string("tsfile_writer_test_") +
+ generate_random_string(10) + std::string(".tsfile");
+ remove(file_name_.c_str());
+ int flags = O_WRONLY | O_CREAT | O_TRUNC;
+#ifdef _WIN32
+ flags |= O_BINARY;
+#endif
+ mode_t mode = 0666;
+ EXPECT_EQ(tsfile_writer_->open(file_name_, flags, mode), common::E_OK);
+ }
+ void TearDown() override {
+ delete tsfile_writer_;
+ remove(file_name_.c_str());
+ }
+
+ std::string file_name_;
+ TsFileWriter *tsfile_writer_ = nullptr;
+
+ public:
+ static std::string generate_random_string(int length) {
+ std::random_device rd;
+ std::mt19937 gen(rd());
+ std::uniform_int_distribution<> dis(0, 61);
+
+ const std::string chars =
+ "0123456789"
+ "abcdefghijklmnopqrstuvwxyz"
+ "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
+
+ std::string random_string;
+
+ for (int i = 0; i < length; ++i) {
+ random_string += chars[dis(gen)];
+ }
+
+ return random_string;
+ }
+
+ static std::string field_to_string(storage::Field *value) {
+ if (value->type_ == common::TEXT) {
+ return std::string(value->value_.sval_);
+ } else {
+ std::stringstream ss;
+ switch (value->type_) {
+ case common::BOOLEAN:
+ ss << (value->value_.bval_ ? "true" : "false");
+ break;
+ case common::INT32:
+ ss << value->value_.ival_;
+ break;
+ case common::INT64:
+ ss << value->value_.lval_;
+ break;
+ case common::FLOAT:
+ ss << value->value_.fval_;
+ break;
+ case common::DOUBLE:
+ ss << value->value_.dval_;
+ break;
+ case common::NULL_TYPE:
+ ss << "NULL";
+ break;
+ default:
+ ASSERT(false);
+ break;
+ }
+ return ss.str();
+ }
+ }
+};
+
+TEST_F(TsFileReaderTest, ResultSetMetadata) {
+ std::string device_path = "device1";
+ std::string measurement_name = "temperature";
+ common::TSDataType data_type = common::TSDataType::INT32;
+ common::TSEncoding encoding = common::TSEncoding::PLAIN;
+ common::CompressionType compression_type =
+ common::CompressionType::UNCOMPRESSED;
+ tsfile_writer_->register_timeseries(
+ device_path, storage::MeasurementSchema(measurement_name, data_type,
+ encoding, compression_type));
+
+ for (int i = 0; i < 50000; ++i) {
+ TsRecord record(1622505600000 + i * 1000, device_path);
+ record.add_point(measurement_name, (int32_t)i);
+ ASSERT_EQ(tsfile_writer_->write_record(record), E_OK);
+ ASSERT_EQ(tsfile_writer_->flush(), E_OK);
+ }
+ ASSERT_EQ(tsfile_writer_->close(), E_OK);
+
+ std::vector<std::string> select_list = {"device1.temperature"};
+
+ storage::TsFileReader reader;
+ int ret = reader.open(file_name_);
+ ASSERT_EQ(ret, common::E_OK);
+ storage::ResultSet *tmp_qds = nullptr;
+
+ ret = reader.query(select_list, 1622505600000, 1622505600000 + 50000 *
1000,
+ tmp_qds);
+ auto *qds = (QDSWithoutTimeGenerator *)tmp_qds;
+
+ ResultSetMetadata *result_set_metadaa = qds->get_metadata();
+ ASSERT_EQ(result_set_metadaa->get_column_type(0), data_type);
+ ASSERT_EQ(result_set_metadaa->get_column_name(0),
+ device_path + "." + measurement_name);
+ reader.destroy_query_data_set(qds);
+ reader.close();
+}
+
+TEST_F(TsFileReaderTest, GetAllDevice) {
Review Comment:
Add tests with enough devices/measurements to generate internal nodes in the
index.
##########
cpp/src/reader/tsfile_reader.cc:
##########
@@ -37,29 +54,74 @@ TsFileReader::~TsFileReader() {
delete read_file_;
read_file_ = nullptr;
}
+ return ret; // TO DO
}
-int TsFileReader::open(const std::string &file_path) {
+int TsFileReader::query(QueryExpression *qe, ResultSet *&ret_qds) {
+ return tsfile_executor_->execute(qe, ret_qds);
+}
+
+int TsFileReader::query(std::vector<std::string> &path_list, int64_t
start_time,
+ int64_t end_time, ResultSet *&result_set) {
int ret = E_OK;
- read_file_ = new storage::ReadFile;
- tsfile_executor_ = new storage::TsFileExecutor();
- if (RET_FAIL(read_file_->open(file_path))) {
- std::cout << "filed to open file " << ret << std::endl;
- } else if (RET_FAIL(tsfile_executor_->init(read_file_))) {
- std::cout << "filed to init " << ret << std::endl;
+ Filter *time_filter = new TimeBetween(start_time, end_time, false);
+ Expression *exp =
+ new storage::Expression(storage::GLOBALTIME_EXPR, time_filter);
+ std::vector<Path> path_list_vec;
+ for (const auto &path : path_list) {
+ uint32_t last_point_pos = path.find_last_of('.');
+ if (last_point_pos <= 0) {
+ return E_INVALID_PATH;
+ }
+ std::string device_name = path.substr(0, last_point_pos);
+ std::string measurement_name =
+ path.substr(last_point_pos + 1, path.size() - last_point_pos);
+ path_list_vec.emplace_back(Path(device_name, measurement_name));
Review Comment:
This may not be so simple. A path segment may contain "." when surrounded by
"\`".
For example, a path may be "root.db1.d1.\`s1.1\`".
In this case, this split is invalid.
##########
cpp/src/reader/tsfile_reader.cc:
##########
@@ -37,29 +54,74 @@ TsFileReader::~TsFileReader() {
delete read_file_;
read_file_ = nullptr;
}
+ return ret; // TO DO
}
-int TsFileReader::open(const std::string &file_path) {
+int TsFileReader::query(QueryExpression *qe, ResultSet *&ret_qds) {
+ return tsfile_executor_->execute(qe, ret_qds);
+}
+
+int TsFileReader::query(std::vector<std::string> &path_list, int64_t
start_time,
+ int64_t end_time, ResultSet *&result_set) {
int ret = E_OK;
- read_file_ = new storage::ReadFile;
- tsfile_executor_ = new storage::TsFileExecutor();
- if (RET_FAIL(read_file_->open(file_path))) {
- std::cout << "filed to open file " << ret << std::endl;
- } else if (RET_FAIL(tsfile_executor_->init(read_file_))) {
- std::cout << "filed to init " << ret << std::endl;
+ Filter *time_filter = new TimeBetween(start_time, end_time, false);
+ Expression *exp =
+ new storage::Expression(storage::GLOBALTIME_EXPR, time_filter);
+ std::vector<Path> path_list_vec;
+ for (const auto &path : path_list) {
+ uint32_t last_point_pos = path.find_last_of('.');
+ if (last_point_pos <= 0) {
+ return E_INVALID_PATH;
+ }
+ std::string device_name = path.substr(0, last_point_pos);
+ std::string measurement_name =
+ path.substr(last_point_pos + 1, path.size() - last_point_pos);
+ path_list_vec.emplace_back(Path(device_name, measurement_name));
}
+ QueryExpression *query_expression =
+ QueryExpression::create(path_list_vec, exp);
+ ret = tsfile_executor_->execute(query_expression, result_set);
return ret;
}
-int TsFileReader::query(QueryExpression *qe, QueryDataSet *&ret_qds) {
- return tsfile_executor_->execute(qe, ret_qds);
+void TsFileReader::destroy_query_data_set(storage::ResultSet *qds) {
+ tsfile_executor_->destroy_query_data_set(qds);
}
-void TsFileReader::destroy_query_data_set(storage::QueryDataSet *qds) {
- tsfile_executor_->destroy_query_data_set(qds);
+std::vector<std::string> TsFileReader::get_all_devices() {
+ TsFileMeta *tsfile_meta = tsfile_executor_->get_tsfile_meta();
+ std::vector<std::string> device_ids;
+ if (tsfile_meta != nullptr) {
+ device_ids.reserve(tsfile_meta->index_node_->children_.size());
+ for (const auto &meta_index_entry :
+ tsfile_meta->index_node_->children_) {
+ device_ids.push_back(meta_index_entry->name_.to_std_string());
+ }
+ }
+ return device_ids;
+}
+
+int TsFileReader::get_timeseries_schema(
+ const std::string &device_id, std::vector<MeasurementSchema> &result) {
+ int ret = E_OK;
+ std::vector<ITimeseriesIndex *> timeseries_indexs;
+ PageArena pa;
+ pa.init(512, MOD_TSFILE_READER);
+ if (RET_FAIL(tsfile_executor_->get_tsfile_io_reader()
+ ->get_device_timeseries_meta_without_chunk_meta(
+ device_id, timeseries_indexs, pa))) {
+ } else {
+ for (auto timeseries_index : timeseries_indexs) {
+ MeasurementSchema ms(
+ timeseries_index->get_measurement_name().to_std_string(),
+ timeseries_index->get_data_type());
+ result.push_back(ms);
+ }
+ }
+ return E_OK;
Review Comment:
If the first `if` fails, `E_OK` is returned, which may not be correct.
##########
cpp/src/cwrapper/TsFile-cwrapper.cc:
##########
@@ -689,17 +690,17 @@ ErrorCode destory_query_dataret(QueryDataRet data) {
}
DataResult* ts_next(QueryDataRet data, int expect_line_count) {
- storage::QueryDataSet* qds = (storage::QueryDataSet*)data->data;
+ storage::ResultSet* qds = (storage::ResultSet*)data->data;
DataResult* result = create_tablet("result", expect_line_count);
storage::RowRecord* record;
bool init_tablet = false;
for (int i = 0; i < expect_line_count; i++) {
- record = qds->get_next();
- if (record == nullptr) {
+ if (!qds->next()) {
break;
- std::cout << "record null now"
+ std::cout << "no more record now"
<< "i = " << i << std::endl;
}
Review Comment:
Remove
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]