This is an automated email from the ASF dual-hosted git repository. colinlee pushed a commit to branch support_dataframe_to_tsfile in repository https://gitbox.apache.org/repos/asf/tsfile.git
commit 77da19ed6f30c121fcaa641156c9577fc4f5c52a Author: ColinLee <[email protected]> AuthorDate: Thu Feb 12 21:15:40 2026 +0800 tmp code. --- cpp/src/common/tsblock/tuple_desc.h | 9 +++ .../reader/block/single_device_tsblock_reader.cc | 8 +++ cpp/src/reader/column_mapping.h | 11 ++- cpp/src/reader/table_query_executor.cc | 11 +-- .../reader/table_view/tsfile_reader_table_test.cc | 81 ++++++++++++++++++++++ .../writer/table_view/tsfile_writer_table_test.cc | 2 +- python/tsfile/schema.py | 7 ++ python/tsfile/tsfile_reader.pyx | 4 +- python/tsfile/utils.py | 36 +++++++--- 9 files changed, 151 insertions(+), 18 deletions(-) diff --git a/cpp/src/common/tsblock/tuple_desc.h b/cpp/src/common/tsblock/tuple_desc.h index 6010d677..3cd26b3f 100644 --- a/cpp/src/common/tsblock/tuple_desc.h +++ b/cpp/src/common/tsblock/tuple_desc.h @@ -76,6 +76,15 @@ class TupleDesc { return column_list_[index].column_category_; } + FORCE_INLINE int get_time_column_index() const { + for (uint32_t i = 0; i < column_list_.size(); i++) { + if (column_list_[i].get_column_category() == ColumnCategory::TIME) { + return i; + } + } + return -1; + } + FORCE_INLINE std::string get_column_name(uint32_t index) { return column_list_[index].column_name_; } diff --git a/cpp/src/reader/block/single_device_tsblock_reader.cc b/cpp/src/reader/block/single_device_tsblock_reader.cc index 0e2b350c..836ab695 100644 --- a/cpp/src/reader/block/single_device_tsblock_reader.cc +++ b/cpp/src/reader/block/single_device_tsblock_reader.cc @@ -164,6 +164,14 @@ int SingleDeviceTsBlockReader::fill_measurements( } col_appenders_[time_column_index_]->append((const char*)&next_time_, sizeof(next_time_)); + int time_in_query_index = tuple_desc_.get_time_column_index(); + if (time_in_query_index != -1) { + if (!col_appenders_[time_in_query_index]->add_row()) { + assert(false); + } + col_appenders_[time_in_query_index]->append( + (const char*)&next_time_, sizeof(next_time_)); + } for (auto& column_context : column_contexts) { column_context->fill_into(col_appenders_); if (RET_FAIL(advance_column(column_context))) { diff --git a/cpp/src/reader/column_mapping.h b/cpp/src/reader/column_mapping.h index abf9eafb..99e15303 100644 --- a/cpp/src/reader/column_mapping.h +++ b/cpp/src/reader/column_mapping.h @@ -36,8 +36,10 @@ class ColumnMapping { if (column_category == common::ColumnCategory::TAG) { tag_columns_.insert(column_name); - } else { + } else if (column_category == common::ColumnCategory::FIELD) { field_columns_.insert(column_name); + } else if (column_category == common::ColumnCategory::TIME) { + time_column_ = column_name; } return common::E_OK; @@ -64,6 +66,10 @@ class ColumnMapping { return field_columns_.find(column_name) != field_columns_.end(); } + bool is_time(const std::string& column_name) const { + return time_column_ == column_name; + } + const std::unordered_set<std::string>& get_id_columns() const { return tag_columns_; } @@ -72,8 +78,11 @@ class ColumnMapping { return field_columns_; } + const std::string get_time_column() const { return time_column_; } + private: std::unordered_map<std::string, std::vector<int>> column_pos_map; + std::string time_column_; std::unordered_set<std::string> tag_columns_; std::unordered_set<std::string> field_columns_; }; diff --git a/cpp/src/reader/table_query_executor.cc b/cpp/src/reader/table_query_executor.cc index 79b636b5..2a01a6d5 100644 --- a/cpp/src/reader/table_query_executor.cc +++ b/cpp/src/reader/table_query_executor.cc @@ -65,9 +65,10 @@ int TableQueryExecutor::query(const std::string& table_name, } // column_mapping.add(*measurement_filter); - auto device_task_iterator = std::unique_ptr<DeviceTaskIterator>( - new DeviceTaskIterator(columns, table_root, column_mapping, - meta_data_querier_, id_filter, table_schema)); + auto device_task_iterator = + std::unique_ptr<DeviceTaskIterator>(new DeviceTaskIterator( + lower_case_column_names, table_root, column_mapping, + meta_data_querier_, id_filter, table_schema)); std::unique_ptr<TsBlockReader> tsblock_reader; switch (table_query_ordering_) { @@ -82,8 +83,8 @@ int TableQueryExecutor::query(const std::string& table_name, ret = common::E_UNSUPPORTED_ORDER; } assert(tsblock_reader != nullptr); - ret_qds = - new TableResultSet(std::move(tsblock_reader), columns, data_types); + ret_qds = new TableResultSet(std::move(tsblock_reader), + lower_case_column_names, data_types); return ret; } diff --git a/cpp/test/reader/table_view/tsfile_reader_table_test.cc b/cpp/test/reader/table_view/tsfile_reader_table_test.cc index c281de41..b9f0eb21 100644 --- a/cpp/test/reader/table_view/tsfile_reader_table_test.cc +++ b/cpp/test/reader/table_view/tsfile_reader_table_test.cc @@ -707,3 +707,84 @@ TEST_F(TsFileTableReaderTest, TestNullInTable4) { ASSERT_EQ(line, max_rows); }); } + +TEST_F(TsFileTableReaderTest, TestTimeColumnReader) { + std::vector<common::ColumnSchema> column_schemas; + column_schemas.emplace_back("s0", TSDataType::INT64, + CompressionType::UNCOMPRESSED, + TSEncoding::PLAIN, ColumnCategory::FIELD); + column_schemas.emplace_back("S1", TSDataType::DOUBLE, + CompressionType::UNCOMPRESSED, + TSEncoding::PLAIN, ColumnCategory::FIELD); + // No need to manually insert data into the time column. + column_schemas.emplace_back("TIME_D", TSDataType::TIMESTAMP, + CompressionType::UNCOMPRESSED, + TSEncoding::PLAIN, ColumnCategory::TIME); + + TableSchema table_schema("testTableTime", column_schemas); + auto tsfile_table_writer_ = + std::make_shared<TsFileTableWriter>(&write_file_, &table_schema); + + const int num_rows = 20; + const int64_t base_time = 1000; + storage::Tablet tablet(table_schema.get_table_name(), {"s0", "s1"}, + {TSDataType::INT64, TSDataType::DOUBLE}, + {ColumnCategory::FIELD, ColumnCategory::FIELD}, + num_rows); + + for (int i = 0; i < num_rows; i++) { + int64_t t = base_time + i; + tablet.add_timestamp(i, t); + tablet.add_value(i, 0, static_cast<int64_t>(i * 10)); + tablet.add_value(i, 1, static_cast<double>(i * 1.5)); + } + + ASSERT_EQ(tsfile_table_writer_->write_table(tablet), common::E_OK); + ASSERT_EQ(tsfile_table_writer_->flush(), common::E_OK); + ASSERT_EQ(tsfile_table_writer_->close(), common::E_OK); + + storage::TsFileReader reader; + int ret = reader.open(file_name_); + ASSERT_EQ(ret, common::E_OK); + + ResultSet* tmp_result_set = nullptr; + ret = reader.query(table_schema.get_table_name(), {"s0", "s1", "TIME_D"}, 0, + 1000000000000, tmp_result_set); + ASSERT_EQ(ret, common::E_OK); + ASSERT_NE(tmp_result_set, nullptr); + + auto* table_result_set = dynamic_cast<TableResultSet*>(tmp_result_set); + ASSERT_NE(table_result_set, nullptr); + + auto result_set_metadata = table_result_set->get_metadata(); + ASSERT_EQ(result_set_metadata->get_column_count(), + 4); // time + s0 + s1 + TIME_D + ASSERT_EQ(result_set_metadata->get_column_name(1), "time"); + ASSERT_EQ(result_set_metadata->get_column_type(1), TSDataType::INT64); + ASSERT_EQ(result_set_metadata->get_column_name(2), "s0"); + ASSERT_EQ(result_set_metadata->get_column_type(2), TSDataType::INT64); + ASSERT_EQ(result_set_metadata->get_column_name(3), "s1"); + ASSERT_EQ(result_set_metadata->get_column_type(3), TSDataType::DOUBLE); + ASSERT_EQ(result_set_metadata->get_column_name(4), "time_d"); + ASSERT_EQ(result_set_metadata->get_column_type(4), TSDataType::TIMESTAMP); + + bool has_next = false; + int row_count = 0; + while (IS_SUCC(table_result_set->next(has_next)) && has_next) { + int64_t row_time = base_time + row_count; + // Column 1 is built-in time + ASSERT_EQ(table_result_set->get_value<int64_t>(1), row_time); + // s0, s1 + ASSERT_EQ(table_result_set->get_value<int64_t>(2), row_count * 10); + ASSERT_DOUBLE_EQ(table_result_set->get_value<double>(3), + static_cast<double>(row_count * 1.5)); + // time_d + ASSERT_EQ(table_result_set->get_value<int64_t>("TIME_D"), row_time); + ASSERT_EQ(table_result_set->get_value<int64_t>(4), row_time); + row_count++; + } + ASSERT_EQ(row_count, num_rows); + + reader.destroy_query_data_set(table_result_set); + ASSERT_EQ(reader.close(), common::E_OK); +} diff --git a/cpp/test/writer/table_view/tsfile_writer_table_test.cc b/cpp/test/writer/table_view/tsfile_writer_table_test.cc index d5861ea1..1f8c80ff 100644 --- a/cpp/test/writer/table_view/tsfile_writer_table_test.cc +++ b/cpp/test/writer/table_view/tsfile_writer_table_test.cc @@ -447,7 +447,7 @@ TEST_F(TsFileWriterTableTest, WriteAndReadSimple) { ASSERT_EQ(ret_value, 0); auto* table_result_set = (TableResultSet*)ret; auto metadata = ret->get_metadata(); - ASSERT_EQ(metadata->get_column_name(column_names.size() + 1), "VALUE"); + ASSERT_EQ(metadata->get_column_name(column_names.size() + 1), "value"); bool has_next = false; int cur_line = 0; while (IS_SUCC(table_result_set->next(has_next)) && has_next) { diff --git a/python/tsfile/schema.py b/python/tsfile/schema.py index f0fa39b1..d8671a33 100644 --- a/python/tsfile/schema.py +++ b/python/tsfile/schema.py @@ -197,6 +197,13 @@ class ResultSetMetaData: def set_table_name(self, table_name: str): self.table_name = table_name + def add_column_at(self, index: int, column_name: str, data_type: TSDataType): + """Insert a column and its data type at the given position (0-based index).""" + if index < 0 or index > len(self.column_list): + raise IndexError(f"column index {index} out of range (0 to {len(self.column_list)})") + self.column_list.insert(index, column_name) + self.data_types.insert(index, data_type) + def get_data_type(self, column_index: int) -> TSDataType: if column_index < 1 or column_index > len(self.column_list): raise OverflowError diff --git a/python/tsfile/tsfile_reader.pyx b/python/tsfile/tsfile_reader.pyx index 041764f9..4476d24d 100644 --- a/python/tsfile/tsfile_reader.pyx +++ b/python/tsfile/tsfile_reader.pyx @@ -19,7 +19,6 @@ #cython: language_level=3 import weakref -from email.contentmanager import raw_data_manager from typing import List import pandas as pd @@ -154,7 +153,6 @@ cdef class ResultSetPy: # Well when we check is null, id from 0, so there index -1. if tsfile_result_set_is_null_by_index(self.result, index): return None - # data type in metadata is an array, id from 0. data_type = self.metadata.get_data_type(index) if data_type == TSDataTypePy.INT32: return tsfile_result_set_get_value_by_index_int32_t(self.result, index) @@ -297,7 +295,7 @@ cdef class TsFileReaderPy: return pyresult def query_table_on_tree(self, column_names : List[str], - start_time : int = INT64_MIN, end_time : int = INT64_MAX) -> ResultSetPy: + start_time : int = INT64_MIN, end_time : int = INT64_MAX) -> ResultSetPy: """ Execute a time range query on specified columns on tree structure. :return: query result handler. diff --git a/python/tsfile/utils.py b/python/tsfile/utils.py index 71e21346..4366ef5b 100644 --- a/python/tsfile/utils.py +++ b/python/tsfile/utils.py @@ -99,6 +99,17 @@ def to_dataframe(file_path: str, _start_time = start_time if start_time is not None else np.iinfo(np.int64).min _end_time = end_time if end_time is not None else np.iinfo(np.int64).max + ## Time column handling (table model): + ## 1. Request has no column list (query all): + ## 1.1 TsFile has a time column in schema: query only non-time columns; then rename + ## the first column of the returned DataFrame to the schema time column name. + ## 1.2 TsFile has no time column in schema: query as-is; first column is "time". + ## 2. Request has a column list but no time column: + ## 2.1 TsFile has a time column in schema: query with requested columns; rename the + ## first column to the schema time column name. + ## 2.2 TsFile has no time column in schema: first column stays "time"; no rename. + ## 3. Request has a column list including the time column: + ## 3.1 Query with requested columns (including time); do not rename the first column. with TsFileReaderPy(file_path) as reader: total_rows = 0 table_schema = reader.get_all_table_schemas() @@ -117,11 +128,17 @@ def to_dataframe(file_path: str, raise TableNotExistError(_table_name) columns = table_schema[_table_name] - column_names_in_file = columns.get_column_names() + column_names_in_file = [] + time_column = None + for column in columns: + if column.get_category() == ColumnCategory.TIME: + time_column = column.get_column_name() + else: + column_names_in_file.append(column.get_column_name()) if _column_names is not None: for column in _column_names: - if column.lower() not in column_names_in_file: + if column.lower() not in column_names_in_file and column.lower() != time_column : raise ColumnNotExistError(column) else: _column_names = column_names_in_file @@ -136,18 +153,21 @@ def to_dataframe(file_path: str, with query_result as result: while result.next(): if max_row_num is None: - df = result.read_data_frame() + dataframe = result.read_data_frame() elif is_iterator: - df = result.read_data_frame(max_row_num) + dataframe = result.read_data_frame(max_row_num) else: remaining_rows = max_row_num - total_rows if remaining_rows <= 0: break - df = result.read_data_frame(remaining_rows) - if df is None or df.empty: + dataframe = result.read_data_frame(remaining_rows) + if dataframe is None or dataframe.empty: continue - total_rows += len(df) - yield df + total_rows += len(dataframe) + if time_column is not None: + if _column_names is None or time_column.lower() not in [c.lower() for c in _column_names]: + dataframe = dataframe.rename(columns={dataframe.columns[0]: time_column}) + yield dataframe if (not is_iterator) and max_row_num is not None and total_rows >= max_row_num: break
