This is an automated email from the ASF dual-hosted git repository. hongzhigao pushed a commit to branch feat/c-python-timeseries-metadata in repository https://gitbox.apache.org/repos/asf/tsfile.git
commit c4a1238a2fe70ce7b64d57e2055dbf36c28f3417 Author: 761417898 <[email protected]> AuthorDate: Fri Apr 3 19:21:21 2026 +0800 c and python get_metadata interface wrapper --- cpp/src/cwrapper/tsfile_cwrapper.cc | 236 ++++++++++++++++++++++++++++ cpp/src/cwrapper/tsfile_cwrapper.h | 78 +++++++++ cpp/test/cwrapper/cwrapper_metadata_test.cc | 139 ++++++++++++++++ python/tests/test_reader_metadata.py | 84 ++++++++++ python/tsfile/schema.py | 33 ++++ python/tsfile/tsfile_cpp.pxd | 39 +++++ python/tsfile/tsfile_py_cpp.pxd | 3 + python/tsfile/tsfile_py_cpp.pyx | 106 +++++++++++++ python/tsfile/tsfile_reader.pyx | 17 +- 9 files changed, 734 insertions(+), 1 deletion(-) diff --git a/cpp/src/cwrapper/tsfile_cwrapper.cc b/cpp/src/cwrapper/tsfile_cwrapper.cc index 8cf7b622..1f1010c8 100644 --- a/cpp/src/cwrapper/tsfile_cwrapper.cc +++ b/cpp/src/cwrapper/tsfile_cwrapper.cc @@ -26,8 +26,12 @@ #include <cstring> #include <set> +#include <vector> +#include "common/device_id.h" +#include "common/statistic.h" #include "common/tablet.h" +#include "common/tsfile_common.h" #include "reader/result_set.h" #include "reader/table_result_set.h" #include "reader/tsfile_reader.h" @@ -695,6 +699,238 @@ DeviceSchema* tsfile_reader_get_all_timeseries_schemas(TsFileReader reader, return device_schema; } +const DeviceID tsfile_c_metadata_empty_device_list_marker = {nullptr}; + +namespace { + +void clear_timeseries_statistic(TimeseriesStatistic* s) { + memset(s, 0, sizeof(*s)); +} + +void fill_timeseries_statistic(storage::Statistic* st, + TimeseriesStatistic* out) { + clear_timeseries_statistic(out); + if (st == nullptr) { + return; + } + out->has_statistic = true; + out->row_count = st->get_count(); + out->start_time = st->start_time_; + out->end_time = st->get_end_time(); + out->sum_valid = false; + out->sum = 0.0; + const common::TSDataType t = st->get_type(); + switch (t) { + case common::BOOLEAN: { + auto* bs = static_cast<storage::BooleanStatistic*>(st); + out->sum_valid = true; + out->sum = static_cast<double>(bs->sum_value_); + break; + } + case common::INT32: + case common::DATE: { + auto* is = static_cast<storage::Int32Statistic*>(st); + out->sum_valid = true; + out->sum = static_cast<double>(is->sum_value_); + break; + } + case common::INT64: + case common::TIMESTAMP: { + auto* ls = static_cast<storage::Int64Statistic*>(st); + out->sum_valid = true; + out->sum = ls->sum_value_; + break; + } + case common::FLOAT: { + auto* fs = static_cast<storage::FloatStatistic*>(st); + out->sum_valid = true; + out->sum = static_cast<double>(fs->sum_value_); + break; + } + case common::DOUBLE: { + auto* ds = static_cast<storage::DoubleStatistic*>(st); + out->sum_valid = true; + out->sum = ds->sum_value_; + break; + } + default: + break; + } +} + +void free_device_timeseries_metadata_entries_partial( + DeviceTimeseriesMetadataEntry* entries, size_t filled_count) { + if (entries == nullptr) { + return; + } + for (size_t i = 0; i < filled_count; i++) { + free(entries[i].device.path); + entries[i].device.path = nullptr; + if (entries[i].timeseries != nullptr) { + for (uint32_t j = 0; j < entries[i].timeseries_count; j++) { + free(entries[i].timeseries[j].measurement_name); + } + free(entries[i].timeseries); + entries[i].timeseries = nullptr; + } + } + free(entries); +} + +} // namespace + +ERRNO tsfile_reader_get_all_devices(TsFileReader reader, DeviceID** out_devices, + uint32_t* out_length) { + if (reader == nullptr || out_devices == nullptr || out_length == nullptr) { + return common::E_INVALID_ARG; + } + *out_devices = nullptr; + *out_length = 0; + auto* r = static_cast<storage::TsFileReader*>(reader); + const auto ids = r->get_all_devices(); + if (ids.empty()) { + return common::E_OK; + } + auto* arr = static_cast<DeviceID*>(malloc(sizeof(DeviceID) * ids.size())); + if (arr == nullptr) { + return common::E_OOM; + } + memset(arr, 0, sizeof(DeviceID) * ids.size()); + for (size_t i = 0; i < ids.size(); i++) { + const std::string name = + ids[i] ? ids[i]->get_device_name() : std::string(); + arr[i].path = strdup(name.c_str()); + if (arr[i].path == nullptr) { + tsfile_free_device_id_array(arr, static_cast<uint32_t>(i)); + return common::E_OOM; + } + } + *out_devices = arr; + *out_length = static_cast<uint32_t>(ids.size()); + return common::E_OK; +} + +void tsfile_free_device_id_array(DeviceID* devices, uint32_t length) { + if (devices == nullptr) { + return; + } + for (uint32_t i = 0; i < length; i++) { + free(devices[i].path); + devices[i].path = nullptr; + } + free(devices); +} + +ERRNO tsfile_reader_get_timeseries_metadata( + TsFileReader reader, const DeviceID* device_ids, uint32_t length, + DeviceTimeseriesMetadataMap* out_map) { + if (reader == nullptr || out_map == nullptr) { + return common::E_INVALID_ARG; + } + out_map->entries = nullptr; + out_map->device_count = 0; + auto* r = static_cast<storage::TsFileReader*>(reader); + storage::DeviceTimeseriesMetadataMap cpp_map; + if (device_ids == nullptr) { + cpp_map = r->get_timeseries_metadata(); + } else if (length == 0) { + return common::E_OK; + } else { + std::vector<std::shared_ptr<storage::IDeviceID>> query_ids; + query_ids.reserve(length); + for (uint32_t i = 0; i < length; i++) { + if (device_ids[i].path == nullptr) { + return common::E_INVALID_ARG; + } + query_ids.push_back(std::make_shared<storage::StringArrayDeviceID>( + std::string(device_ids[i].path))); + } + cpp_map = r->get_timeseries_metadata(query_ids); + } + if (cpp_map.empty()) { + return common::E_OK; + } + const uint32_t dev_n = static_cast<uint32_t>(cpp_map.size()); + auto* entries = static_cast<DeviceTimeseriesMetadataEntry*>( + malloc(sizeof(DeviceTimeseriesMetadataEntry) * dev_n)); + if (entries == nullptr) { + return common::E_OOM; + } + memset(entries, 0, sizeof(DeviceTimeseriesMetadataEntry) * dev_n); + size_t di = 0; + for (const auto& kv : cpp_map) { + DeviceTimeseriesMetadataEntry& e = entries[di]; + const std::string dname = + kv.first ? kv.first->get_device_name() : std::string(); + e.device.path = strdup(dname.c_str()); + if (e.device.path == nullptr) { + free_device_timeseries_metadata_entries_partial(entries, di); + return common::E_OOM; + } + const auto& vec = kv.second; + e.timeseries_count = static_cast<uint32_t>(vec.size()); + if (e.timeseries_count == 0) { + e.timeseries = nullptr; + di++; + continue; + } + e.timeseries = static_cast<TimeseriesMetadata*>( + malloc(sizeof(TimeseriesMetadata) * e.timeseries_count)); + if (e.timeseries == nullptr) { + free(e.device.path); + e.device.path = nullptr; + free_device_timeseries_metadata_entries_partial(entries, di); + return common::E_OOM; + } + memset(e.timeseries, 0, + sizeof(TimeseriesMetadata) * e.timeseries_count); + for (uint32_t ti = 0; ti < e.timeseries_count; ti++) { + const auto& idx = vec[ti]; + TimeseriesMetadata& m = e.timeseries[ti]; + if (idx == nullptr) { + continue; + } + common::String mn = idx->get_measurement_name(); + m.measurement_name = strdup(mn.to_std_string().c_str()); + if (m.measurement_name == nullptr) { + for (uint32_t u = 0; u <= ti; u++) { + free(e.timeseries[u].measurement_name); + } + free(e.timeseries); + e.timeseries = nullptr; + free(e.device.path); + e.device.path = nullptr; + free_device_timeseries_metadata_entries_partial(entries, di); + return common::E_OOM; + } + m.data_type = static_cast<TSDataType>(idx->get_data_type()); + storage::Statistic* st = idx->get_statistic(); + int32_t chunk_cnt = 0; + auto* cl = idx->get_chunk_meta_list(); + if (cl != nullptr) { + chunk_cnt = static_cast<int32_t>(cl->size()); + } + m.chunk_meta_count = chunk_cnt; + fill_timeseries_statistic(st, &m.statistic); + } + di++; + } + out_map->entries = entries; + out_map->device_count = dev_n; + return common::E_OK; +} + +void tsfile_free_device_timeseries_metadata_map( + DeviceTimeseriesMetadataMap* map) { + if (map == nullptr) { + return; + } + free_device_timeseries_metadata_entries_partial(map->entries, + map->device_count); + map->entries = nullptr; + map->device_count = 0; +} + // delete pointer void _free_tsfile_ts_record(TsRecord* record) { if (*record != nullptr) { diff --git a/cpp/src/cwrapper/tsfile_cwrapper.h b/cpp/src/cwrapper/tsfile_cwrapper.h index 4f4ce8d6..1cad8c47 100644 --- a/cpp/src/cwrapper/tsfile_cwrapper.h +++ b/cpp/src/cwrapper/tsfile_cwrapper.h @@ -104,6 +104,56 @@ typedef struct device_schema { int timeseries_num; } DeviceSchema; +/** + * @brief Device identifier for C API (canonical path string from IDeviceID). + */ +typedef struct DeviceID { + char* path; +} DeviceID; + +/** + * @brief Aggregated statistic for one timeseries (subset of C++ Statistic). + */ +typedef struct TimeseriesStatistic { + bool has_statistic; + int32_t row_count; + int64_t start_time; + int64_t end_time; + /** True when @p sum is meaningful (numeric / boolean aggregate types). */ + bool sum_valid; + /** Sum when sum_valid; boolean uses sum of true as int-like aggregate. */ + double sum; +} TimeseriesStatistic; + +/** + * @brief One measurement's metadata as exposed to C. + */ +typedef struct TimeseriesMetadata { + char* measurement_name; + TSDataType data_type; + int32_t chunk_meta_count; + TimeseriesStatistic statistic; +} TimeseriesMetadata; + +typedef struct DeviceTimeseriesMetadataEntry { + DeviceID device; + TimeseriesMetadata* timeseries; + uint32_t timeseries_count; +} DeviceTimeseriesMetadataEntry; + +/** + * @brief Map device -> list of TimeseriesMetadata (C layout with explicit + * counts). + */ +typedef struct DeviceTimeseriesMetadataMap { + DeviceTimeseriesMetadataEntry* entries; + uint32_t device_count; +} DeviceTimeseriesMetadataMap; + +/** Sentinel: optional address for bindings when querying an empty device_id + * list (length 0). */ +extern const DeviceID tsfile_c_metadata_empty_device_list_marker; + typedef struct result_set_meta_data { char** column_names; TSDataType* data_types; @@ -316,6 +366,34 @@ ERRNO tsfile_writer_close(TsFileWriter writer); */ ERRNO tsfile_reader_close(TsFileReader reader); +/** + * @brief Lists all devices in the file. + * + * @param out_devices [out] Allocated array; caller frees with + * tsfile_free_device_id_array. + * @param out_length [out] Number of devices. + */ +ERRNO tsfile_reader_get_all_devices(TsFileReader reader, DeviceID** out_devices, + uint32_t* out_length); + +void tsfile_free_device_id_array(DeviceID* devices, uint32_t length); + +/** + * @brief Timeseries metadata for none, some, or all devices. + * + * @param device_ids NULL: all devices (length ignored). + * Non-NULL with length==0: empty result (E_OK), device_ids + * not read. Non-NULL with length>0: only these devices (existing only). + * @param out_map [out] Must point to zeroed struct; filled on success. + * Free with tsfile_free_device_timeseries_metadata_map. + */ +ERRNO tsfile_reader_get_timeseries_metadata( + TsFileReader reader, const DeviceID* device_ids, uint32_t length, + DeviceTimeseriesMetadataMap* out_map); + +void tsfile_free_device_timeseries_metadata_map( + DeviceTimeseriesMetadataMap* map); + /*--------------------------Tablet API------------------------ */ /** diff --git a/cpp/test/cwrapper/cwrapper_metadata_test.cc b/cpp/test/cwrapper/cwrapper_metadata_test.cc new file mode 100644 index 00000000..16b29df5 --- /dev/null +++ b/cpp/test/cwrapper/cwrapper_metadata_test.cc @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <gtest/gtest.h> +#include <unistd.h> + +#include <cstring> +#include <string> + +extern "C" { +#include "cwrapper/errno_define_c.h" +#include "cwrapper/tsfile_cwrapper.h" +} + +namespace cwrapper_metadata { + +class CWrapperMetadataTest : public testing::Test {}; + +TEST_F(CWrapperMetadataTest, GetAllDevicesAndMetadataWithStatistic) { + ERRNO code = RET_OK; + const char* filename = "cwrapper_metadata_stat.tsfile"; + remove(filename); + + const char* device = "root.sg.d1"; + char* m_int = strdup("s_int"); + timeseries_schema sch{}; + sch.timeseries_name = m_int; + sch.data_type = TS_DATATYPE_INT32; + sch.encoding = TS_ENCODING_PLAIN; + sch.compression = TS_COMPRESSION_UNCOMPRESSED; + + auto* writer = static_cast<void*>( + _tsfile_writer_new(filename, 128 * 1024 * 1024, &code)); + ASSERT_EQ(RET_OK, code); + ASSERT_EQ(RET_OK, _tsfile_writer_register_timeseries(writer, device, &sch)); + + for (int row = 0; row < 3; row++) { + auto* record = static_cast<TsRecord>( + _ts_record_new(device, static_cast<int64_t>(row + 1), 1)); + const int32_t v = static_cast<int32_t>((row + 1) * 10); + ASSERT_EQ(RET_OK, _insert_data_into_ts_record_by_name_int32_t( + record, m_int, v)); + ASSERT_EQ(RET_OK, _tsfile_writer_write_ts_record(writer, record)); + _free_tsfile_ts_record(reinterpret_cast<TsRecord*>(&record)); + } + ASSERT_EQ(RET_OK, _tsfile_writer_close(writer)); + + TsFileReader reader = tsfile_reader_new(filename, &code); + ASSERT_EQ(RET_OK, code); + ASSERT_NE(nullptr, reader); + + DeviceID* devices = nullptr; + uint32_t n_dev = 0; + ASSERT_EQ(RET_OK, tsfile_reader_get_all_devices(reader, &devices, &n_dev)); + ASSERT_EQ(1u, n_dev); + ASSERT_NE(nullptr, devices); + ASSERT_STREQ(device, devices[0].path); + tsfile_free_device_id_array(devices, n_dev); + + DeviceTimeseriesMetadataMap map{}; + ASSERT_EQ(RET_OK, + tsfile_reader_get_timeseries_metadata(reader, nullptr, 0, &map)); + ASSERT_EQ(1u, map.device_count); + ASSERT_NE(nullptr, map.entries); + ASSERT_STREQ(device, map.entries[0].device.path); + ASSERT_EQ(1u, map.entries[0].timeseries_count); + ASSERT_NE(nullptr, map.entries[0].timeseries); + TimeseriesMetadata& tm = map.entries[0].timeseries[0]; + ASSERT_STREQ(m_int, tm.measurement_name); + ASSERT_EQ(TS_DATATYPE_INT32, tm.data_type); + ASSERT_TRUE(tm.statistic.has_statistic); + EXPECT_EQ(3, tm.statistic.row_count); + EXPECT_EQ(1, tm.statistic.start_time); + EXPECT_EQ(3, tm.statistic.end_time); + ASSERT_TRUE(tm.statistic.sum_valid); + EXPECT_DOUBLE_EQ(60.0, tm.statistic.sum); + + tsfile_free_device_timeseries_metadata_map(&map); + + DeviceTimeseriesMetadataMap empty{}; + ASSERT_EQ(RET_OK, tsfile_reader_get_timeseries_metadata( + reader, &tsfile_c_metadata_empty_device_list_marker, + 0, &empty)); + EXPECT_EQ(0u, empty.device_count); + EXPECT_EQ(nullptr, empty.entries); + + DeviceID q{}; + q.path = const_cast<char*>(device); + DeviceTimeseriesMetadataMap one{}; + ASSERT_EQ(RET_OK, + tsfile_reader_get_timeseries_metadata(reader, &q, 1, &one)); + ASSERT_EQ(1u, one.device_count); + tsfile_free_device_timeseries_metadata_map(&one); + + ASSERT_EQ(RET_OK, tsfile_reader_close(reader)); + free(m_int); + remove(filename); +} + +TEST_F(CWrapperMetadataTest, GetTimeseriesMetadataInvalidArgs) { + ERRNO code = RET_OK; + const char* filename = "cwrapper_metadata_empty.tsfile"; + remove(filename); + + auto* writer = static_cast<void*>( + _tsfile_writer_new(filename, 128 * 1024 * 1024, &code)); + ASSERT_EQ(RET_OK, code); + ASSERT_EQ(RET_OK, _tsfile_writer_close(writer)); + + TsFileReader reader = tsfile_reader_new(filename, &code); + ASSERT_EQ(RET_OK, code); + + DeviceTimeseriesMetadataMap map{}; + EXPECT_NE(RET_OK, + tsfile_reader_get_timeseries_metadata(nullptr, nullptr, 0, &map)); + EXPECT_NE(RET_OK, tsfile_reader_get_timeseries_metadata(reader, nullptr, 0, + nullptr)); + + ASSERT_EQ(RET_OK, tsfile_reader_close(reader)); + remove(filename); +} + +} // namespace cwrapper_metadata diff --git a/python/tests/test_reader_metadata.py b/python/tests/test_reader_metadata.py new file mode 100644 index 00000000..f58c0f67 --- /dev/null +++ b/python/tests/test_reader_metadata.py @@ -0,0 +1,84 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +import os +import tempfile + +import pytest + +from tsfile import Field, RowRecord, TimeseriesSchema, TsFileReader, TsFileWriter +from tsfile import TSDataType +from tsfile.schema import DeviceID + + +def test_get_all_devices_and_timeseries_metadata_statistic(): + path = os.path.join(tempfile.gettempdir(), "py_reader_metadata_stat.tsfile") + try: + os.unlink(path) + except OSError: + pass + + device = "root.sg.py_meta" + writer = TsFileWriter(path) + writer.register_timeseries( + device, TimeseriesSchema("m_int", TSDataType.INT32)) + for row in range(3): + v = (row + 1) * 10 + writer.write_row_record( + RowRecord( + device, + row + 1, + [Field("m_int", v, TSDataType.INT32)], + ) + ) + writer.close() + + reader = TsFileReader(path) + try: + devices = reader.get_all_devices() + assert len(devices) == 1 + assert devices[0].path == device + + meta_all = reader.get_timeseries_metadata(None) + assert list(meta_all.keys()) == [device] + series = meta_all[device] + assert len(series) == 1 + m = series[0] + assert m.measurement_name == "m_int" + assert m.data_type == TSDataType.INT32 + st = m.statistic + assert st.has_statistic + assert st.row_count == 3 + assert st.start_time == 1 + assert st.end_time == 3 + assert st.sum_valid + assert st.sum == pytest.approx(60.0) + + assert reader.get_timeseries_metadata([]) == {} + + sub = reader.get_timeseries_metadata([DeviceID(device)]) + assert device in sub + assert len(sub[device]) == 1 + + sub_str = reader.get_timeseries_metadata([device]) + assert device in sub_str + finally: + reader.close() + try: + os.unlink(path) + except OSError: + pass diff --git a/python/tsfile/schema.py b/python/tsfile/schema.py index c89649bf..955253ea 100644 --- a/python/tsfile/schema.py +++ b/python/tsfile/schema.py @@ -15,12 +15,45 @@ # specific language governing permissions and limitations # under the License. # +from dataclasses import dataclass from typing import List from .exceptions import TypeMismatchError from .constants import TSDataType, ColumnCategory, TSEncoding, Compressor +@dataclass(frozen=True) +class DeviceID: + """Device path string as returned by the native reader (tree/table file layout).""" + + path: str + + def __str__(self) -> str: + return self.path + + +@dataclass(frozen=True) +class TimeseriesStatistic: + """Subset of file chunk statistic exposed through the C API.""" + + has_statistic: bool + row_count: int + start_time: int + end_time: int + sum_valid: bool + sum: float + + +@dataclass(frozen=True) +class TimeseriesMetadata: + """Per-measurement metadata from get_timeseries_metadata (includes statistic when present).""" + + measurement_name: str + data_type: TSDataType + chunk_meta_count: int + statistic: TimeseriesStatistic + + class TimeseriesSchema: """ Metadata schema for a time series (name, data type, encoding, compression). diff --git a/python/tsfile/tsfile_cpp.pxd b/python/tsfile/tsfile_cpp.pxd index 29008148..22f32459 100644 --- a/python/tsfile/tsfile_cpp.pxd +++ b/python/tsfile/tsfile_cpp.pxd @@ -103,6 +103,34 @@ cdef extern from "cwrapper/tsfile_cwrapper.h": TimeseriesSchema * timeseries_schema int timeseries_num + ctypedef struct DeviceID: + char * path + + ctypedef struct TimeseriesStatistic: + bint has_statistic + int32_t row_count + int64_t start_time + int64_t end_time + bint sum_valid + double sum + + ctypedef struct TimeseriesMetadata: + char * measurement_name + TSDataType data_type + int32_t chunk_meta_count + TimeseriesStatistic statistic + + ctypedef struct DeviceTimeseriesMetadataEntry: + DeviceID device + TimeseriesMetadata * timeseries + uint32_t timeseries_count + + ctypedef struct DeviceTimeseriesMetadataMap: + DeviceTimeseriesMetadataEntry * entries + uint32_t device_count + + const DeviceID tsfile_c_metadata_empty_device_list_marker + ctypedef struct ResultSetMetaData: char** column_names TSDataType * data_types @@ -218,6 +246,17 @@ cdef extern from "cwrapper/tsfile_cwrapper.h": DeviceSchema * tsfile_reader_get_all_timeseries_schemas(TsFileReader reader, uint32_t * size); + ErrorCode tsfile_reader_get_all_devices(TsFileReader reader, + DeviceID ** out_devices, + uint32_t * out_length); + void tsfile_free_device_id_array(DeviceID * devices, uint32_t length); + + ErrorCode tsfile_reader_get_timeseries_metadata( + TsFileReader reader, const DeviceID * device_ids, uint32_t length, + DeviceTimeseriesMetadataMap * out_map); + void tsfile_free_device_timeseries_metadata_map( + DeviceTimeseriesMetadataMap * map); + # resultSet : get data from resultSet bint tsfile_result_set_next(ResultSet result_set, ErrorCode * err_code); bint tsfile_result_set_is_null_by_index(ResultSet result_set, uint32_t column_index); diff --git a/python/tsfile/tsfile_py_cpp.pxd b/python/tsfile/tsfile_py_cpp.pxd index 197a4ec8..b6baee80 100644 --- a/python/tsfile/tsfile_py_cpp.pxd +++ b/python/tsfile/tsfile_py_cpp.pxd @@ -67,5 +67,8 @@ cdef public api ResultSet tsfile_reader_query_table_by_row_c(TsFileReader reader cdef public api object get_table_schema(TsFileReader reader, object table_name) cdef public api object get_all_table_schema(TsFileReader reader) cdef public api object get_all_timeseries_schema(TsFileReader reader) +cdef public api object reader_get_all_devices_c(TsFileReader reader) +cdef public api object reader_get_timeseries_metadata_c(TsFileReader reader, + object device_ids) cpdef public api object get_tsfile_config() cpdef public api void set_tsfile_config(dict new_config) \ No newline at end of file diff --git a/python/tsfile/tsfile_py_cpp.pyx b/python/tsfile/tsfile_py_cpp.pyx index 4febeb73..d913b0c4 100644 --- a/python/tsfile/tsfile_py_cpp.pyx +++ b/python/tsfile/tsfile_py_cpp.pyx @@ -26,6 +26,7 @@ import numpy as np from libc.stdlib cimport free from libc.stdlib cimport malloc from libc.string cimport strdup +from libc.string cimport memset from cpython.exc cimport PyErr_SetObject from cpython.unicode cimport PyUnicode_AsUTF8String, PyUnicode_AsUTF8, PyUnicode_AsUTF8AndSize from cpython.bytes cimport PyBytes_AsString, PyBytes_AsStringAndSize @@ -36,6 +37,9 @@ from tsfile.schema import TSDataType as TSDataTypePy, TSEncoding as TSEncodingPy from tsfile.schema import Compressor as CompressorPy, ColumnCategory as CategoryPy from tsfile.schema import TableSchema as TableSchemaPy, ColumnSchema as ColumnSchemaPy from tsfile.schema import DeviceSchema as DeviceSchemaPy, TimeseriesSchema as TimeseriesSchemaPy +from tsfile.schema import DeviceID as ReaderDeviceID +from tsfile.schema import TimeseriesStatistic as TimeseriesStatisticPy +from tsfile.schema import TimeseriesMetadata as TimeseriesMetadataPy # check exception and set py exception object cdef inline void check_error(int errcode, const char * context=NULL) except*: @@ -922,3 +926,105 @@ cdef object get_all_timeseries_schema(TsFileReader reader): device_schemas.update([(schema_py.get_device_name(), schema_py)]) free(schemas) return device_schemas + +cdef object timeseries_metadata_c_to_py(TimeseriesMetadata* m): + cdef str name_py + if m == NULL or m.measurement_name == NULL: + name_py = "" + else: + name_py = m.measurement_name.decode('utf-8') + cdef object stat = TimeseriesStatisticPy( + bool(m.statistic.has_statistic), + int(m.statistic.row_count), + int(m.statistic.start_time), + int(m.statistic.end_time), + bool(m.statistic.sum_valid), + float(m.statistic.sum), + ) + return TimeseriesMetadataPy( + name_py, + TSDataTypePy(m.data_type), + int(m.chunk_meta_count), + stat, + ) + +cdef dict device_timeseries_metadata_map_to_py(DeviceTimeseriesMetadataMap* mmap): + cdef dict out = {} + cdef uint32_t di, ti + cdef char* p + cdef str key + cdef list series + for di in range(mmap.device_count): + p = mmap.entries[di].device.path + if p == NULL: + key = "" + else: + key = p.decode('utf-8') + series = [] + for ti in range(mmap.entries[di].timeseries_count): + series.append( + timeseries_metadata_c_to_py( + &mmap.entries[di].timeseries[ti])) + out[key] = series + return out + +cdef public api object reader_get_all_devices_c(TsFileReader reader): + cdef DeviceID* arr = NULL + cdef uint32_t n = 0 + cdef int err + cdef list out = [] + cdef uint32_t i + err = tsfile_reader_get_all_devices(reader, &arr, &n) + check_error(err) + try: + for i in range(n): + out.append(ReaderDeviceID(arr[i].path.decode('utf-8'))) + finally: + tsfile_free_device_id_array(arr, n) + return out + +cdef public api object reader_get_timeseries_metadata_c(TsFileReader reader, + object device_ids): + cdef DeviceTimeseriesMetadataMap mmap + cdef DeviceID* q = NULL + cdef uint32_t qlen = 0 + cdef uint32_t i + cdef int err + cdef bytes bpath + cdef const char* raw + memset(&mmap, 0, sizeof(DeviceTimeseriesMetadataMap)) + if device_ids is None: + err = tsfile_reader_get_timeseries_metadata(reader, NULL, 0, &mmap) + check_error(err) + elif len(device_ids) == 0: + err = tsfile_reader_get_timeseries_metadata( + reader, &tsfile_c_metadata_empty_device_list_marker, 0, &mmap) + check_error(err) + else: + qlen = <uint32_t> len(device_ids) + q = <DeviceID*> malloc(sizeof(DeviceID) * qlen) + if q == NULL: + raise MemoryError() + memset(q, 0, sizeof(DeviceID) * qlen) + try: + for i in range(qlen): + dev = device_ids[i] + try: + path_s = dev.path + except AttributeError: + path_s = str(dev) + bpath = path_s.encode('utf-8') + raw = PyBytes_AsString(bpath) + q[i].path = strdup(raw) + if q[i].path == NULL: + raise MemoryError() + err = tsfile_reader_get_timeseries_metadata(reader, q, qlen, &mmap) + check_error(err) + finally: + for i in range(qlen): + free(q[i].path) + free(q) + try: + return device_timeseries_metadata_map_to_py(&mmap) + finally: + tsfile_free_device_timeseries_metadata_map(&mmap) diff --git a/python/tsfile/tsfile_reader.pyx b/python/tsfile/tsfile_reader.pyx index 3a1a15d4..52a9a94f 100644 --- a/python/tsfile/tsfile_reader.pyx +++ b/python/tsfile/tsfile_reader.pyx @@ -19,7 +19,7 @@ #cython: language_level=3 import weakref -from typing import List +from typing import List, Optional, Dict import pandas as pd from libc.stdint cimport INT64_MIN, INT64_MAX @@ -427,6 +427,21 @@ cdef class TsFileReaderPy: """ return get_all_timeseries_schema(self.reader) + def get_all_devices(self): + """ + Return all device IDs in the file as :class:`tsfile.schema.DeviceID`. + """ + return reader_get_all_devices_c(self.reader) + + def get_timeseries_metadata(self, device_ids: Optional[List] = None) -> Dict[str, list]: + """ + Return map device path -> list of :class:`tsfile.schema.TimeseriesMetadata`. + + ``device_ids is None``: all devices. ``device_ids == []``: empty map. + Non-empty list restricts to those devices (only existing devices appear). + """ + return reader_get_timeseries_metadata_c(self.reader, device_ids) + def close(self): """ Close TsFile Reader, if reader has result sets, invalid them.
