This is an automated email from the ASF dual-hosted git repository. colinlee pushed a commit to branch hdf in repository https://gitbox.apache.org/repos/asf/tsfile.git
commit c01caafa283d47f93c22368ff58720b79f5d1c3f Author: ColinLee <[email protected]> AuthorDate: Mon Mar 31 19:58:48 2025 +0800 tmp code. --- cpp/src/cwrapper/tsfile_cwrapper.cc | 25 ++++++++++-- cpp/src/writer/tsfile_writer.cc | 9 +++++ cpp/test/cwrapper/cwrapper_test.cc | 61 ++++++++++++++++++++++++++++ python/examples/example.py | 77 +++++++++++++++++++++++++----------- python/examples/table_data.tsfile | Bin 0 -> 454542 bytes python/tsfile/schema.py | 5 ++- python/tsfile/tablet.py | 4 ++ 7 files changed, 152 insertions(+), 29 deletions(-) diff --git a/cpp/src/cwrapper/tsfile_cwrapper.cc b/cpp/src/cwrapper/tsfile_cwrapper.cc index 371e8ced..0f424744 100644 --- a/cpp/src/cwrapper/tsfile_cwrapper.cc +++ b/cpp/src/cwrapper/tsfile_cwrapper.cc @@ -293,9 +293,13 @@ ResultSet tsfile_query_table(TsFileReader reader, const char *table_name, std::vector<std::string> column_names; for (uint32_t i = 0; i < column_num; i++) { column_names.emplace_back(columns[i]); + std::cout<<"begin to query" << columns[i]<<std::endl; } + std::cout<<"begin and end"<< start_time << "\t" << end_time<<std::endl; *err_code = r->query(table_name, column_names, start_time, end_time, table_result_set); + std::cout << "result is null ?" << (table_result_set == nullptr) << std::endl; + return table_result_set; } @@ -305,6 +309,7 @@ bool tsfile_result_set_next(ResultSet result_set, ERRNO *err_code) { int ret = common::E_OK; ret = r->next(has_next); *err_code = ret; + std::cout<<"has next ?, ret is ?"<<has_next<< " "<< ret<<std::endl; if (ret != common::E_OK) { return false; } @@ -603,14 +608,14 @@ ERRNO _tsfile_writer_register_table(TsFileWriter writer, TableSchema *schema) { for (int i = 0; i < schema->column_num; i++) { ColumnSchema *cur_schema = schema->column_schemas + i; measurement_schemas[i] = new storage::MeasurementSchema( - cur_schema->column_name, + storage::to_lower(cur_schema->column_name), static_cast<common::TSDataType>(cur_schema->data_type)); column_categories.push_back( static_cast<common::ColumnCategory>(cur_schema->column_category)); } auto tsfile_writer = static_cast<storage::TsFileWriter *>(writer); return tsfile_writer->register_table(std::make_shared<storage::TableSchema>( - schema->table_name, measurement_schemas, column_categories)); + storage::to_lower(schema->table_name), measurement_schemas, column_categories)); } ERRNO _tsfile_writer_register_timeseries(TsFileWriter writer, @@ -654,9 +659,23 @@ ERRNO _tsfile_writer_write_tablet(TsFileWriter writer, Tablet tablet) { return w->write_tablet(*tbl); } -ERRNO _tsfile_writer_write_table(TsFileWriter writer, Tablet tablet) { + ERRNO _tsfile_writer_write_table(TsFileWriter writer, Tablet tablet) { auto *w = static_cast<storage::TsFileWriter *>(writer); auto *tbl = static_cast<storage::Tablet *>(tablet); + + tbl->set_table_name(storage::to_lower(tbl->get_table_name())); + for (int i = 0; i < tbl->get_column_count(); i++) { + tbl->set_column_name(i, storage::to_lower(tbl->get_column_name(i))); + } + + auto schema_map = tbl->get_schema_map(); + std::map<std::string, int> schema_map_; + for (auto iter = schema_map.begin(); iter != schema_map.end(); iter++) { + schema_map_[storage::to_lower(iter->first)] = iter->second; + } + tbl->set_schema_map(schema_map_); + + return w->write_table(*tbl); } diff --git a/cpp/src/writer/tsfile_writer.cc b/cpp/src/writer/tsfile_writer.cc index f289f875..f4b175df 100644 --- a/cpp/src/writer/tsfile_writer.cc +++ b/cpp/src/writer/tsfile_writer.cc @@ -703,6 +703,15 @@ int TsFileWriter::write_tablet(const Tablet &tablet) { int TsFileWriter::write_table(Tablet &tablet) { int ret = E_OK; + std::cout<<"Tablet target name is " << tablet.insert_target_name_<<std::endl; + for (int i = 0 ; i < tablet.get_column_count(); i++) { + std::cout<<"column is " << tablet.get_column_name(i) <<std::endl; + } + std::cout<<"time value is " << std::endl; + for (int i = 0 ; i < tablet.get_cur_row_size();i++) { + std::cout<<tablet.timestamps_[i] << std::endl; + } + if (io_writer_->get_schema()->table_schema_map_.find( tablet.insert_target_name_) == io_writer_->get_schema()->table_schema_map_.end()) { diff --git a/cpp/test/cwrapper/cwrapper_test.cc b/cpp/test/cwrapper/cwrapper_test.cc index 60eeabda..e63d0005 100644 --- a/cpp/test/cwrapper/cwrapper_test.cc +++ b/cpp/test/cwrapper/cwrapper_test.cc @@ -184,4 +184,65 @@ TEST_F(CWrapperTest, WriterFlushTabletAndReadData) { free_write_file(&file); } + +TEST_F(CWrapperTest, SimpleTest) { + TableSchema schema; + ERRNO code = 0; + schema.table_name = strdup("testtable0"); + schema.column_num = 3; + schema.column_schemas = + static_cast<ColumnSchema*>(malloc(3 * sizeof(ColumnSchema))); + schema.column_schemas[0].column_name = strdup("device_id"); + schema.column_schemas[0].column_category = TAG; + schema.column_schemas[0].data_type = TS_DATATYPE_STRING; + schema.column_schemas[1].column_name = strdup("Value"); + schema.column_schemas[1].column_category = FIELD; + schema.column_schemas[1].data_type = TS_DATATYPE_FLOAT; + schema.column_schemas[2].column_name = strdup("Flags"); + schema.column_schemas[2].column_category = FIELD; + schema.column_schemas[2].data_type = TS_DATATYPE_BOOLEAN; + + remove("cwrapper_write_flush_and_read.tsfile"); + TsFileWriter writer = _tsfile_writer_new("cwrapper_write_flush_and_read.tsfile", &code); + code = _tsfile_writer_register_table(writer, &schema); + ASSERT_EQ(code, RET_OK); + + char** column_names = + static_cast<char**>(malloc(3 * sizeof(char*))); + TSDataType* data_types = + static_cast<TSDataType*>(malloc(sizeof(TSDataType) * 3)); + column_names[0] = strdup("device_id"); + column_names[1] = strdup("Value"); + column_names[2] = strdup("Flags"); + data_types[0] = TS_DATATYPE_STRING; + data_types[1] = TS_DATATYPE_FLOAT; + data_types[2] = TS_DATATYPE_BOOLEAN; + + int time = 0; + for (int tablet_id = 0; tablet_id < 20; tablet_id++) { + Tablet tablet = _tablet_new_with_target_name("testtable0", column_names, data_types, 3, 1000); + for (int i = 0; i < 1000; i++) { + tablet_add_timestamp(tablet, i, time++); + tablet_add_value_by_name_string(tablet, i, "device_id", std::string("sensor"+ std::to_string(tablet_id)).c_str()); + tablet_add_value_by_name_float(tablet, i, "Value", time* 1.1); + tablet_add_value_by_name_bool(tablet, i, "Flags", true); + } + code = _tsfile_writer_write_table(writer, tablet); + ASSERT_EQ(code, RET_OK); + } + + + ASSERT_EQ(_tsfile_writer_close(writer), 0); + + + TsFileReader reader = tsfile_reader_new("cwrapper_write_flush_and_read.tsfile", &code); + ResultSet result_set = tsfile_query_table(reader, "testtable0", column_names, 3, INT64_MIN, INT64_MAX, &code); + while (tsfile_result_set_next(result_set, &code) && code == RET_OK) { + std::cout << tsfile_result_set_get_value_by_name_float(result_set, "Value"); + } + free_tsfile_result_set(&result_set); + tsfile_reader_close(reader); +} + + } // namespace cwrapper \ No newline at end of file diff --git a/python/examples/example.py b/python/examples/example.py index cd0e61e5..f74828e1 100644 --- a/python/examples/example.py +++ b/python/examples/example.py @@ -16,45 +16,74 @@ # under the License. import os +import h5py from tsfile import ColumnSchema, TableSchema from tsfile import Tablet from tsfile import TsFileTableWriter, TsFileReader, TSDataType, TSEncoding, Compressor, ColumnCategory -## Write +import numpy as np +from datetime import datetime, timedelta +import time + +## init a var to store the datas from each dataset +datas = {} + +with h5py.File("export-1808881-B-301A-2024-01-01-ZPPP-ZGGG.hdf5", "r") as file: + # get the group by name which is "recoreded parameters" + group = file["Recorded Parameters"] + # for each dataset in the group, print the name and datatype + for name, dataset in group.items(): + if dataset.dtype["Value"] == np.float32: + # store the whole dataset in the datas and the dataset type is compound which is time double, value float, flags int + # convert the dataset to a numpy array + datas[name] = dataset[:] + + group = file["Computed Parameters"] + # for each dataset in the group, print the name and datatype + for name, dataset in group.items(): + if dataset.dtype["Value"] == np.float32: + # store the whole dataset in the datas and the dataset type is compound which is time double, value float, flags int + # convert the dataset to a numpy array + datas[name] = dataset[:] + +print(len(datas)) +# print(datas) + +# starttime +start_time = time.time() + +# Write table_data_dir = os.path.join(os.path.dirname(__file__), "table_data.tsfile") if os.path.exists(table_data_dir): os.remove(table_data_dir) -column1 = ColumnSchema("id", TSDataType.STRING, ColumnCategory.TAG) -column2 = ColumnSchema("id2", TSDataType.STRING, ColumnCategory.TAG) -column3 = ColumnSchema("value", TSDataType.FLOAT, ColumnCategory.FIELD) -table_schema = TableSchema("test_table", columns=[column1, column2, column3]) - +column1 = ColumnSchema("name", TSDataType.STRING, ColumnCategory.TAG) +column2 = ColumnSchema("value", TSDataType.FLOAT, ColumnCategory.FIELD) +column3 = ColumnSchema("flags", TSDataType.BOOLEAN, ColumnCategory.FIELD) +table_schema = TableSchema("float_table", columns=[column1, column2, column3]) ### Free resource automatically with TsFileTableWriter(table_data_dir, table_schema) as writer: - tablet_row_num = 100 - tablet = Tablet( - ["id", "id2", "value"], - [TSDataType.STRING, TSDataType.STRING, TSDataType.FLOAT], - tablet_row_num) - - for i in range(tablet_row_num): - tablet.add_timestamp(i, i * 10) - tablet.add_value_by_name("id", i, "test1") - tablet.add_value_by_name("id2", i, "test" + str(i)) - tablet.add_value_by_index(2, i, i * 100.2) + for name, data in datas.items(): + tablet_row_num = len(data) + tablet = Tablet( + ["name", "value", "flags"], + [TSDataType.STRING, TSDataType.FLOAT, TSDataType.BOOLEAN], + tablet_row_num) + min_value, max_value = 0, 0 + # add data + for i in range(tablet_row_num): + tablet.add_value_by_name("name", i, name) + tablet.add_timestamp(i, (int)(data["Time"][i])) + tablet.add_value_by_name("value", i, float(data["Value"][i])) + tablet.add_value_by_name("flags", i, (bool)(data["Flags"][i] == 1)) - writer.write_table(tablet) + writer.write_table(tablet) ## Read - ### Free resource automatically with TsFileReader(table_data_dir) as reader: - with reader.query_table("test_table", ["id2", "value"], 0, 50) as result: + with reader.query_table("float_table", ["name", "value", "flags"]) as result: while result.next(): - print(result.get_value_by_name("id2")) - print(result.get_value_by_name("value")) - print(result.read_data_frame()) - + print(result.get_value_by_name("value")) \ No newline at end of file diff --git a/python/examples/table_data.tsfile b/python/examples/table_data.tsfile new file mode 100644 index 00000000..4d68c972 Binary files /dev/null and b/python/examples/table_data.tsfile differ diff --git a/python/tsfile/schema.py b/python/tsfile/schema.py index 65189b70..aeed710e 100644 --- a/python/tsfile/schema.py +++ b/python/tsfile/schema.py @@ -74,7 +74,7 @@ class ColumnSchema: data_type = None def __init__(self, column_name: str, data_type: TSDataType, category: ColumnCategory = ColumnCategory.FIELD): - self.column_name = column_name + self.column_name = column_name.lower() self.data_type = data_type self.category = category @@ -97,7 +97,8 @@ class TableSchema: columns = None def __init__(self, table_name: str, columns: List[ColumnSchema]): - self.table_name = table_name + + self.table_name = table_name.lower() self.columns = columns def get_table_name(self): diff --git a/python/tsfile/tablet.py b/python/tsfile/tablet.py index 6fb76ea2..48679dd7 100644 --- a/python/tsfile/tablet.py +++ b/python/tsfile/tablet.py @@ -47,6 +47,7 @@ class Tablet(object): ] self.target_name = None self.column_name_list = column_name_list + column_name_list_back = [name.lower() for name in column_name_list] self.type_list = type_list self.max_row_num = max_row_num @@ -98,8 +99,11 @@ class Tablet(object): self.timestamp_list = timestamp_list def add_timestamp(self, row_index: int, timestamp: int): + if timestamp in self.timestamp_list: + print("Warning: timestamp already in use") self.timestamp_list[row_index] = timestamp + def _check_numeric_range(self, value: Union[int, float], data_type: TSDataType): if math.isnan(value) or math.isinf(value): if data_type == TSDataType.INT32 or data_type == TSDataType.INT64:
