This is an automated email from the ASF dual-hosted git repository. colinlee pushed a commit to branch support_tag_filter in repository https://gitbox.apache.org/repos/asf/tsfile.git
commit 28c603202f1b3dd4ba5ded83b01b3a31ae858a8c Author: ColinLee <[email protected]> AuthorDate: Tue Apr 7 22:46:50 2026 +0800 support tag filter. --- cpp/src/cwrapper/tsfile_cwrapper.cc | 108 +++++++++++++++++ cpp/src/cwrapper/tsfile_cwrapper.h | 81 +++++++++++++ python/tests/test_tag_filter.py | 233 ++++++++++++++++++++++++++++++++++++ python/tsfile/__init__.py | 2 + python/tsfile/tag_filter.py | 151 +++++++++++++++++++++++ python/tsfile/tsfile_cpp.pxd | 48 ++++++++ python/tsfile/tsfile_py_cpp.pxd | 5 + python/tsfile/tsfile_py_cpp.pyx | 30 +++++ python/tsfile/tsfile_reader.pyx | 78 +++++++++++- 9 files changed, 731 insertions(+), 5 deletions(-) diff --git a/cpp/src/cwrapper/tsfile_cwrapper.cc b/cpp/src/cwrapper/tsfile_cwrapper.cc index e6ecef2a..a293080a 100644 --- a/cpp/src/cwrapper/tsfile_cwrapper.cc +++ b/cpp/src/cwrapper/tsfile_cwrapper.cc @@ -32,6 +32,7 @@ #include "common/statistic.h" #include "common/tablet.h" #include "common/tsfile_common.h" +#include "reader/filter/tag_filter.h" #include "reader/result_set.h" #include "reader/table_result_set.h" #include "reader/tsfile_reader.h" @@ -1479,6 +1480,113 @@ ResultSet _tsfile_reader_query_device(TsFileReader reader, *err_code = r->query(selected_paths, start_time, end_time, qds); return qds; } + +// ---------- Tag Filter API ---------- + +TagFilterHandle tsfile_tag_filter_create(TsFileReader reader, + const char* table_name, + const char* column_name, + const char* value, TagFilterOp op, + ERRNO* err_code) { + auto* r = static_cast<storage::TsFileReader*>(reader); + auto schema = r->get_table_schema(table_name); + if (!schema) { + *err_code = common::E_INVALID_ARG; + return nullptr; + } + storage::TagFilterBuilder builder(schema.get()); + storage::Filter* filter = nullptr; + switch (op) { + case TAG_FILTER_EQ: + filter = builder.eq(column_name, value); + break; + case TAG_FILTER_NEQ: + filter = builder.neq(column_name, value); + break; + case TAG_FILTER_LT: + filter = builder.lt(column_name, value); + break; + case TAG_FILTER_LTEQ: + filter = builder.lteq(column_name, value); + break; + case TAG_FILTER_GT: + filter = builder.gt(column_name, value); + break; + case TAG_FILTER_GTEQ: + filter = builder.gteq(column_name, value); + break; + case TAG_FILTER_REGEXP: + filter = builder.reg_exp(column_name, value); + break; + case TAG_FILTER_NOT_REGEXP: + filter = builder.not_reg_exp(column_name, value); + break; + default: + *err_code = common::E_INVALID_ARG; + return nullptr; + } + *err_code = common::E_OK; + return static_cast<void*>(filter); +} + +TagFilterHandle tsfile_tag_filter_between(TsFileReader reader, + const char* table_name, + const char* column_name, + const char* lower, const char* upper, + bool is_not, ERRNO* err_code) { + auto* r = static_cast<storage::TsFileReader*>(reader); + auto schema = r->get_table_schema(table_name); + if (!schema) { + *err_code = common::E_INVALID_ARG; + return nullptr; + } + storage::TagFilterBuilder builder(schema.get()); + storage::Filter* filter = + is_not ? builder.not_between_and(column_name, lower, upper) + : builder.between_and(column_name, lower, upper); + *err_code = common::E_OK; + return static_cast<void*>(filter); +} + +TagFilterHandle tsfile_tag_filter_and(TagFilterHandle left, + TagFilterHandle right) { + return static_cast<void*>(storage::TagFilterBuilder::and_filter( + static_cast<storage::Filter*>(left), + static_cast<storage::Filter*>(right))); +} + +TagFilterHandle tsfile_tag_filter_or(TagFilterHandle left, + TagFilterHandle right) { + return static_cast<void*>(storage::TagFilterBuilder::or_filter( + static_cast<storage::Filter*>(left), + static_cast<storage::Filter*>(right))); +} + +TagFilterHandle tsfile_tag_filter_not(TagFilterHandle filter) { + return static_cast<void*>(storage::TagFilterBuilder::not_filter( + static_cast<storage::Filter*>(filter))); +} + +void tsfile_tag_filter_free(TagFilterHandle filter) { + delete static_cast<storage::Filter*>(filter); +} + +ResultSet tsfile_query_table_with_tag_filter( + TsFileReader reader, const char* table_name, char** columns, + uint32_t column_num, Timestamp start_time, Timestamp end_time, + TagFilterHandle tag_filter, int batch_size, ERRNO* err_code) { + auto* r = static_cast<storage::TsFileReader*>(reader); + storage::ResultSet* table_result_set = nullptr; + std::vector<std::string> column_names; + for (uint32_t i = 0; i < column_num; i++) { + column_names.emplace_back(columns[i]); + } + *err_code = r->query(table_name, column_names, start_time, end_time, + table_result_set, + static_cast<storage::Filter*>(tag_filter), batch_size); + return table_result_set; +} + #ifdef __cplusplus } #endif \ No newline at end of file diff --git a/cpp/src/cwrapper/tsfile_cwrapper.h b/cpp/src/cwrapper/tsfile_cwrapper.h index 6c0e6d2c..619063b6 100644 --- a/cpp/src/cwrapper/tsfile_cwrapper.h +++ b/cpp/src/cwrapper/tsfile_cwrapper.h @@ -859,6 +859,87 @@ TableSchema* tsfile_reader_get_all_table_schemas(TsFileReader reader, DeviceSchema* tsfile_reader_get_all_timeseries_schemas(TsFileReader reader, uint32_t* size); +// ---------- Tag Filter API ---------- + +/** + * @brief Opaque handle representing a tag filter for device-level filtering. + */ +typedef void* TagFilterHandle; + +/** + * @brief Tag filter comparison operators. + */ +typedef enum { + TAG_FILTER_EQ = 0, + TAG_FILTER_NEQ = 1, + TAG_FILTER_LT = 2, + TAG_FILTER_LTEQ = 3, + TAG_FILTER_GT = 4, + TAG_FILTER_GTEQ = 5, + TAG_FILTER_REGEXP = 6, + TAG_FILTER_NOT_REGEXP = 7, +} TagFilterOp; + +/** + * @brief Create a tag filter with a comparison operator. + * + * @param reader [in] TsFileReader handle (used to resolve column name to + * index). + * @param table_name [in] Table name whose schema defines the TAG columns. + * @param column_name [in] Name of the TAG column to filter on. + * @param value [in] Comparison value (string). + * @param op [in] Comparison operator (TagFilterOp). + * @param err_code [out] Error code. E_OK(0) on success. + * @return TagFilterHandle on success; NULL on failure. + */ +TagFilterHandle tsfile_tag_filter_create(TsFileReader reader, + const char* table_name, + const char* column_name, + const char* value, TagFilterOp op, + ERRNO* err_code); + +/** + * @brief Create a BETWEEN tag filter (lower <= column <= upper). + */ +TagFilterHandle tsfile_tag_filter_between(TsFileReader reader, + const char* table_name, + const char* column_name, + const char* lower, const char* upper, + bool is_not, ERRNO* err_code); + +/** + * @brief Combine two tag filters with AND. + */ +TagFilterHandle tsfile_tag_filter_and(TagFilterHandle left, + TagFilterHandle right); + +/** + * @brief Combine two tag filters with OR. + */ +TagFilterHandle tsfile_tag_filter_or(TagFilterHandle left, + TagFilterHandle right); + +/** + * @brief Negate a tag filter. + */ +TagFilterHandle tsfile_tag_filter_not(TagFilterHandle filter); + +/** + * @brief Free a tag filter and all its children. + */ +void tsfile_tag_filter_free(TagFilterHandle filter); + +/** + * @brief Query table with tag filter. + * + * @param batch_size <= 0 means row-by-row return mode, + * > 0 means return TsBlock with the specified block size. + */ +ResultSet tsfile_query_table_with_tag_filter( + TsFileReader reader, const char* table_name, char** columns, + uint32_t column_num, Timestamp start_time, Timestamp end_time, + TagFilterHandle tag_filter, int batch_size, ERRNO* err_code); + // Close and free resource. void free_tablet(Tablet* tablet); void free_tsfile_result_set(ResultSet* result_set); diff --git a/python/tests/test_tag_filter.py b/python/tests/test_tag_filter.py new file mode 100644 index 00000000..69f48af5 --- /dev/null +++ b/python/tests/test_tag_filter.py @@ -0,0 +1,233 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import os + +import pytest + +from tsfile import ( + ColumnSchema, TableSchema, TSDataType, ColumnCategory, + TsFileTableWriter, TsFileReader, Tablet, + tag_eq, tag_neq, tag_lt, tag_lteq, tag_gt, tag_gteq, + tag_regexp, tag_not_regexp, tag_between, tag_not_between, +) + +TSFILE_PATH = "test_tag_filter.tsfile" + +# Schema: table "sensors" with TAG columns "region" and "device", +# and FIELD column "value" (DOUBLE). +TABLE_NAME = "sensors" +COLUMNS = [ + ColumnSchema("region", TSDataType.STRING, ColumnCategory.TAG), + ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG), + ColumnSchema("value", TSDataType.DOUBLE, ColumnCategory.FIELD), +] +SCHEMA = TableSchema(TABLE_NAME, COLUMNS) + +# Test data: +# region | device | timestamps | values +# north | dev_a | 0..4 | 0.0..4.0 +# north | dev_b | 5..9 | 5.0..9.0 +# south | dev_c | 10..14 | 10.0..14.0 +# east | dev_d | 15..19 | 15.0..19.0 +DEVICES = [ + ("north", "dev_a", 0, 5), + ("north", "dev_b", 5, 10), + ("south", "dev_c", 10, 15), + ("east", "dev_d", 15, 20), +] + + [email protected](scope="module", autouse=True) +def create_tsfile(): + """Write the test TsFile once for all tests in this module.""" + if os.path.exists(TSFILE_PATH): + os.remove(TSFILE_PATH) + + with TsFileTableWriter(TSFILE_PATH, SCHEMA) as writer: + for region, device, start, end in DEVICES: + count = end - start + tablet = Tablet( + ["region", "device", "value"], + [TSDataType.STRING, TSDataType.STRING, TSDataType.DOUBLE], + count, + ) + for i in range(count): + ts = start + i + tablet.add_timestamp(i, ts) + tablet.add_value_by_name("region", i, region) + tablet.add_value_by_name("device", i, device) + tablet.add_value_by_name("value", i, float(ts)) + writer.write_table(tablet) + + yield + + if os.path.exists(TSFILE_PATH): + os.remove(TSFILE_PATH) + + +def _query_values(reader, tag_filter): + """Helper: query all columns with the given tag_filter, return list of (region, device, value) tuples.""" + result = reader.query_table(TABLE_NAME, ["region", "device", "value"], tag_filter=tag_filter) + rows = [] + while result.next(): + region = result.get_value_by_name("region") + device = result.get_value_by_name("device") + value = result.get_value_by_name("value") + rows.append((region, device, value)) + result.close() + return rows + + +def test_tag_eq(): + with TsFileReader(TSFILE_PATH) as reader: + rows = _query_values(reader, tag_eq("region", "north")) + assert len(rows) == 10 # dev_a (5) + dev_b (5) + assert all(r[0] == "north" for r in rows) + + +def test_tag_neq(): + with TsFileReader(TSFILE_PATH) as reader: + rows = _query_values(reader, tag_neq("region", "north")) + assert len(rows) == 10 # south (5) + east (5) + assert all(r[0] != "north" for r in rows) + + +def test_tag_eq_device(): + with TsFileReader(TSFILE_PATH) as reader: + rows = _query_values(reader, tag_eq("device", "dev_c")) + assert len(rows) == 5 + assert all(r[1] == "dev_c" for r in rows) + assert all(r[0] == "south" for r in rows) + + +def test_tag_lt(): + with TsFileReader(TSFILE_PATH) as reader: + # Lexicographic: "east" < "north" < "south" + rows = _query_values(reader, tag_lt("region", "north")) + assert len(rows) == 5 # only "east" + assert all(r[0] == "east" for r in rows) + + +def test_tag_lteq(): + with TsFileReader(TSFILE_PATH) as reader: + rows = _query_values(reader, tag_lteq("region", "north")) + assert len(rows) == 15 # "east" (5) + "north" (10) + + +def test_tag_gt(): + with TsFileReader(TSFILE_PATH) as reader: + rows = _query_values(reader, tag_gt("region", "north")) + assert len(rows) == 5 # only "south" + assert all(r[0] == "south" for r in rows) + + +def test_tag_gteq(): + with TsFileReader(TSFILE_PATH) as reader: + rows = _query_values(reader, tag_gteq("region", "north")) + assert len(rows) == 15 # "north" (10) + "south" (5) + + +def test_tag_between(): + with TsFileReader(TSFILE_PATH) as reader: + # Between "east" and "north" (inclusive) + rows = _query_values(reader, tag_between("region", "east", "north")) + assert len(rows) == 15 # "east" (5) + "north" (10) + + +def test_tag_not_between(): + with TsFileReader(TSFILE_PATH) as reader: + rows = _query_values(reader, tag_not_between("region", "east", "north")) + assert len(rows) == 5 # only "south" + assert all(r[0] == "south" for r in rows) + + +def test_tag_regexp(): + with TsFileReader(TSFILE_PATH) as reader: + # Match regions starting with 'n' or 's' + rows = _query_values(reader, tag_regexp("region", "^[ns]")) + assert len(rows) == 15 # "north" (10) + "south" (5) + + +def test_tag_not_regexp(): + with TsFileReader(TSFILE_PATH) as reader: + rows = _query_values(reader, tag_not_regexp("region", "^[ns]")) + assert len(rows) == 5 # only "east" + assert all(r[0] == "east" for r in rows) + + +def test_tag_and(): + with TsFileReader(TSFILE_PATH) as reader: + f = tag_eq("region", "north") & tag_eq("device", "dev_a") + rows = _query_values(reader, f) + assert len(rows) == 5 + assert all(r[0] == "north" and r[1] == "dev_a" for r in rows) + + +def test_tag_or(): + with TsFileReader(TSFILE_PATH) as reader: + f = tag_eq("region", "south") | tag_eq("region", "east") + rows = _query_values(reader, f) + assert len(rows) == 10 + regions = {r[0] for r in rows} + assert regions == {"south", "east"} + + +def test_tag_not(): + with TsFileReader(TSFILE_PATH) as reader: + f = ~tag_eq("region", "north") + rows = _query_values(reader, f) + assert len(rows) == 10 # south + east + assert all(r[0] != "north" for r in rows) + + +def test_tag_complex_combination(): + with TsFileReader(TSFILE_PATH) as reader: + # (region == "north" AND device == "dev_b") OR region == "east" + f = (tag_eq("region", "north") & tag_eq("device", "dev_b")) | tag_eq("region", "east") + rows = _query_values(reader, f) + assert len(rows) == 10 # dev_b (5) + east (5) + for r in rows: + assert (r[0] == "north" and r[1] == "dev_b") or r[0] == "east" + + +def test_no_tag_filter(): + """Verify that query_table without tag_filter still works (backward compat).""" + with TsFileReader(TSFILE_PATH) as reader: + result = reader.query_table(TABLE_NAME, ["region", "device", "value"]) + rows = [] + while result.next(): + rows.append(result.get_value_by_name("region")) + result.close() + assert len(rows) == 20 + + +def test_tag_filter_with_time_range(): + """Tag filter combined with time range.""" + with TsFileReader(TSFILE_PATH) as reader: + result = reader.query_table( + TABLE_NAME, ["region", "device", "value"], + start_time=0, end_time=7, + tag_filter=tag_eq("region", "north"), + ) + rows = [] + while result.next(): + rows.append(result.get_value_by_name("value")) + result.close() + # north has timestamps 0..9, but time range 0..7 gives 8 rows + assert len(rows) == 8 diff --git a/python/tsfile/__init__.py b/python/tsfile/__init__.py index 55fa3b9e..9ca386ef 100644 --- a/python/tsfile/__init__.py +++ b/python/tsfile/__init__.py @@ -37,6 +37,8 @@ from .field import * from .date_utils import * from .exceptions import * from .tsfile_reader import TsFileReaderPy as TsFileReader, ResultSetPy as ResultSet +from .tag_filter import (TagFilter, tag_eq, tag_neq, tag_lt, tag_lteq, tag_gt, tag_gteq, + tag_regexp, tag_not_regexp, tag_between, tag_not_between) from .tsfile_writer import TsFileWriterPy as TsFileWriter from .tsfile_py_cpp import get_tsfile_config, set_tsfile_config from .tsfile_table_writer import TsFileTableWriter diff --git a/python/tsfile/tag_filter.py b/python/tsfile/tag_filter.py new file mode 100644 index 00000000..e5107263 --- /dev/null +++ b/python/tsfile/tag_filter.py @@ -0,0 +1,151 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + + +class TagFilter: + """Base class for tag filters used in table queries.""" + + def __and__(self, other): + return AndTagFilter(self, other) + + def __or__(self, other): + return OrTagFilter(self, other) + + def __invert__(self): + return NotTagFilter(self) + + +class ComparisonTagFilter(TagFilter): + """A tag filter comparing a column to a value with a given operator.""" + + # Operator constants matching TagFilterOp enum in C + EQ = 0 + NEQ = 1 + LT = 2 + LTEQ = 3 + GT = 4 + GTEQ = 5 + REGEXP = 6 + NOT_REGEXP = 7 + + def __init__(self, column_name: str, value: str, op: int): + self.column_name = column_name + self.value = value + self.op = op + + def __repr__(self): + op_names = {0: "==", 1: "!=", 2: "<", 3: "<=", 4: ">", 5: ">=", + 6: "=~", 7: "!~"} + return f"TagFilter({self.column_name} {op_names.get(self.op, '?')} {self.value!r})" + + +class BetweenTagFilter(TagFilter): + """A tag filter for range queries.""" + + def __init__(self, column_name: str, lower: str, upper: str, is_not: bool = False): + self.column_name = column_name + self.lower = lower + self.upper = upper + self.is_not = is_not + + def __repr__(self): + op = "NOT BETWEEN" if self.is_not else "BETWEEN" + return f"TagFilter({self.column_name} {op} {self.lower!r} AND {self.upper!r})" + + +class AndTagFilter(TagFilter): + """Logical AND of two tag filters.""" + + def __init__(self, left: TagFilter, right: TagFilter): + self.left = left + self.right = right + + def __repr__(self): + return f"({self.left} AND {self.right})" + + +class OrTagFilter(TagFilter): + """Logical OR of two tag filters.""" + + def __init__(self, left: TagFilter, right: TagFilter): + self.left = left + self.right = right + + def __repr__(self): + return f"({self.left} OR {self.right})" + + +class NotTagFilter(TagFilter): + """Logical NOT of a tag filter.""" + + def __init__(self, filter: TagFilter): + self.filter = filter + + def __repr__(self): + return f"NOT({self.filter})" + + +# Convenience factory functions +def tag_eq(column_name: str, value: str) -> TagFilter: + """Create a tag equality filter: column == value.""" + return ComparisonTagFilter(column_name, value, ComparisonTagFilter.EQ) + + +def tag_neq(column_name: str, value: str) -> TagFilter: + """Create a tag inequality filter: column != value.""" + return ComparisonTagFilter(column_name, value, ComparisonTagFilter.NEQ) + + +def tag_lt(column_name: str, value: str) -> TagFilter: + """Create a tag less-than filter: column < value.""" + return ComparisonTagFilter(column_name, value, ComparisonTagFilter.LT) + + +def tag_lteq(column_name: str, value: str) -> TagFilter: + """Create a tag less-than-or-equal filter: column <= value.""" + return ComparisonTagFilter(column_name, value, ComparisonTagFilter.LTEQ) + + +def tag_gt(column_name: str, value: str) -> TagFilter: + """Create a tag greater-than filter: column > value.""" + return ComparisonTagFilter(column_name, value, ComparisonTagFilter.GT) + + +def tag_gteq(column_name: str, value: str) -> TagFilter: + """Create a tag greater-than-or-equal filter: column >= value.""" + return ComparisonTagFilter(column_name, value, ComparisonTagFilter.GTEQ) + + +def tag_regexp(column_name: str, pattern: str) -> TagFilter: + """Create a tag regex match filter.""" + return ComparisonTagFilter(column_name, pattern, ComparisonTagFilter.REGEXP) + + +def tag_not_regexp(column_name: str, pattern: str) -> TagFilter: + """Create a tag regex not-match filter.""" + return ComparisonTagFilter(column_name, pattern, ComparisonTagFilter.NOT_REGEXP) + + +def tag_between(column_name: str, lower: str, upper: str) -> TagFilter: + """Create a tag BETWEEN filter: lower <= column <= upper.""" + return BetweenTagFilter(column_name, lower, upper, is_not=False) + + +def tag_not_between(column_name: str, lower: str, upper: str) -> TagFilter: + """Create a tag NOT BETWEEN filter.""" + return BetweenTagFilter(column_name, lower, upper, is_not=True) diff --git a/python/tsfile/tsfile_cpp.pxd b/python/tsfile/tsfile_cpp.pxd index 74f3a7d9..3eeff6cf 100644 --- a/python/tsfile/tsfile_cpp.pxd +++ b/python/tsfile/tsfile_cpp.pxd @@ -306,6 +306,54 @@ cdef extern from "cwrapper/tsfile_cwrapper.h": void tsfile_free_device_timeseries_metadata_map( DeviceTimeseriesMetadataMap * map); + # Tag filter types and functions + ctypedef void * TagFilterHandle + + ctypedef enum TagFilterOp: + TAG_FILTER_EQ = 0, + TAG_FILTER_NEQ = 1, + TAG_FILTER_LT = 2, + TAG_FILTER_LTEQ = 3, + TAG_FILTER_GT = 4, + TAG_FILTER_GTEQ = 5, + TAG_FILTER_REGEXP = 6, + TAG_FILTER_NOT_REGEXP = 7, + + TagFilterHandle tsfile_tag_filter_create(TsFileReader reader, + const char* table_name, + const char* column_name, + const char* value, + TagFilterOp op, + ErrorCode* err_code) + + TagFilterHandle tsfile_tag_filter_between(TsFileReader reader, + const char* table_name, + const char* column_name, + const char* lower, + const char* upper, + bint is_not, + ErrorCode* err_code) + + TagFilterHandle tsfile_tag_filter_and(TagFilterHandle left, + TagFilterHandle right) + + TagFilterHandle tsfile_tag_filter_or(TagFilterHandle left, + TagFilterHandle right) + + TagFilterHandle tsfile_tag_filter_not(TagFilterHandle filter) + + void tsfile_tag_filter_free(TagFilterHandle filter) + + ResultSet tsfile_query_table_with_tag_filter(TsFileReader reader, + const char* table_name, + char** columns, + uint32_t column_num, + int64_t start_time, + int64_t end_time, + TagFilterHandle tag_filter, + int batch_size, + ErrorCode* err_code) + # resultSet : get data from resultSet bint tsfile_result_set_next(ResultSet result_set, ErrorCode * err_code); bint tsfile_result_set_is_null_by_index(ResultSet result_set, uint32_t column_index); diff --git a/python/tsfile/tsfile_py_cpp.pxd b/python/tsfile/tsfile_py_cpp.pxd index b6baee80..d9f18aa7 100644 --- a/python/tsfile/tsfile_py_cpp.pxd +++ b/python/tsfile/tsfile_py_cpp.pxd @@ -64,6 +64,11 @@ cdef public api ResultSet tsfile_reader_query_tree_by_row_c(TsFileReader reader, cdef public api ResultSet tsfile_reader_query_table_by_row_c(TsFileReader reader, object table_name, object column_list, int offset, int limit) +cdef public api ResultSet tsfile_reader_query_table_with_tag_filter_c(TsFileReader reader, object table_name, + object column_list, int64_t start_time, + int64_t end_time, TagFilterHandle tag_filter, + int batch_size) + cdef public api object get_table_schema(TsFileReader reader, object table_name) cdef public api object get_all_table_schema(TsFileReader reader) cdef public api object get_all_timeseries_schema(TsFileReader reader) diff --git a/python/tsfile/tsfile_py_cpp.pyx b/python/tsfile/tsfile_py_cpp.pyx index c564304f..7932ad3e 100644 --- a/python/tsfile/tsfile_py_cpp.pyx +++ b/python/tsfile/tsfile_py_cpp.pyx @@ -901,6 +901,36 @@ cdef ResultSet tsfile_reader_query_paths_c(TsFileReader reader, object device_na free(<void *> sensor_list_c) sensor_list_c = NULL +cdef ResultSet tsfile_reader_query_table_with_tag_filter_c(TsFileReader reader, object table_name, + object column_list, int64_t start_time, + int64_t end_time, TagFilterHandle tag_filter, + int batch_size): + cdef ResultSet result + cdef int column_num = len(column_list) + cdef bytes table_name_bytes = PyUnicode_AsUTF8String(table_name) + cdef const char * table_name_c = table_name_bytes + cdef char** columns = <char**> malloc(sizeof(char *) * column_num) + cdef int i + cdef ErrorCode code = 0 + if columns == NULL: + raise MemoryError("Failed to allocate memory for columns") + try: + for i in range(column_num): + columns[i] = strdup((<str> column_list[i]).encode('utf-8')) + if columns[i] == NULL: + raise MemoryError("Failed to allocate memory for column name") + result = tsfile_query_table_with_tag_filter(reader, table_name_c, columns, column_num, + start_time, end_time, tag_filter, batch_size, &code) + check_error(code) + return result + finally: + if columns != NULL: + for i in range(column_num): + free(<void *> columns[i]) + columns[i] = NULL + free(<void *> columns) + columns = NULL + cdef object get_table_schema(TsFileReader reader, object table_name): cdef bytes table_name_bytes = PyUnicode_AsUTF8String(table_name) cdef const char * table_name_c = table_name_bytes diff --git a/python/tsfile/tsfile_reader.pyx b/python/tsfile/tsfile_reader.pyx index 2259f770..6d0f7a51 100644 --- a/python/tsfile/tsfile_reader.pyx +++ b/python/tsfile/tsfile_reader.pyx @@ -31,6 +31,7 @@ from libc.stdint cimport INT64_MIN, INT64_MAX, uintptr_t from tsfile.schema import TSDataType as TSDataTypePy from tsfile.schema import DeviceID, DeviceTimeseriesMetadataGroup +from tsfile.tag_filter import TagFilter, ComparisonTagFilter, BetweenTagFilter, AndTagFilter, OrTagFilter, NotTagFilter from .date_utils import parse_int_to_date from .tsfile_cpp cimport * from .tsfile_py_cpp cimport * @@ -48,6 +49,9 @@ cdef class ResultSetPy: cdef ResultSet result cdef object metadata + # Tag filter handle owned by this result set (freed on close). + cdef TagFilterHandle _tag_filter_handle + # ResultSet is valid or not, if the reader is closed, valid will be False. cdef object valid # The reader @@ -59,6 +63,7 @@ cdef class ResultSetPy: self.valid = True self.tsfile_reader = weakref.ref(tsfile_reader) self.is_tree = is_tree + self._tag_filter_handle = NULL cdef init_c(self, ResultSet result, object device_name): """ @@ -273,6 +278,10 @@ cdef class ResultSetPy: if self.result != NULL: free_tsfile_result_set(&self.result) + if self._tag_filter_handle != NULL: + tsfile_tag_filter_free(self._tag_filter_handle) + self._tag_filter_handle = NULL + if self.tsfile_reader is not None: reader = self.tsfile_reader() if reader is not None: @@ -318,20 +327,79 @@ cdef class TsFileReaderPy: self.reader = tsfile_reader_new_c(pathname) def query_table(self, table_name : str, column_names : List[str], - start_time : int = INT64_MIN, end_time : int = INT64_MAX) -> ResultSetPy: + start_time : int = INT64_MIN, end_time : int = INT64_MAX, + tag_filter = None, batch_size : int = 0) -> ResultSetPy: """ Execute a time range query on specified table and columns. + :param tag_filter: Optional TagFilter to filter by TAG column values. + :param batch_size: <= 0 for row-by-row mode; > 0 for batch (TsBlock) mode. :return: query result handler. """ - cdef ResultSet result; - result = tsfile_reader_query_table_c(self.reader, table_name.lower(), - [column_name.lower() for column_name in column_names], start_time, - end_time) + cdef ResultSet result + cdef TagFilterHandle c_tag_filter = NULL + if tag_filter is not None: + c_tag_filter = self._build_c_tag_filter(table_name.lower(), tag_filter) + if c_tag_filter != NULL: + result = tsfile_reader_query_table_with_tag_filter_c( + self.reader, table_name.lower(), + [column_name.lower() for column_name in column_names], + start_time, end_time, c_tag_filter, batch_size) + else: + result = tsfile_reader_query_table_c( + self.reader, table_name.lower(), + [column_name.lower() for column_name in column_names], + start_time, end_time) pyresult = ResultSetPy(self) + pyresult._tag_filter_handle = c_tag_filter pyresult.init_c(result, table_name) self.activate_result_set_list.add(pyresult) return pyresult + cdef TagFilterHandle _build_c_tag_filter(self, str table_name, object tag_filter): + """Recursively build C TagFilterHandle from Python TagFilter tree.""" + cdef ErrorCode code = 0 + cdef TagFilterHandle handle = NULL + cdef bytes table_bytes + cdef bytes col_bytes + cdef bytes val_bytes + cdef bytes lower_bytes + cdef bytes upper_bytes + + if isinstance(tag_filter, ComparisonTagFilter): + table_bytes = table_name.encode('utf-8') + col_bytes = tag_filter.column_name.encode('utf-8') + val_bytes = tag_filter.value.encode('utf-8') + handle = tsfile_tag_filter_create( + self.reader, <const char*>table_bytes, + <const char*>col_bytes, <const char*>val_bytes, + <TagFilterOp>tag_filter.op, &code) + check_error(code) + return handle + elif isinstance(tag_filter, BetweenTagFilter): + table_bytes = table_name.encode('utf-8') + col_bytes = tag_filter.column_name.encode('utf-8') + lower_bytes = tag_filter.lower.encode('utf-8') + upper_bytes = tag_filter.upper.encode('utf-8') + handle = tsfile_tag_filter_between( + self.reader, <const char*>table_bytes, + <const char*>col_bytes, <const char*>lower_bytes, + <const char*>upper_bytes, tag_filter.is_not, &code) + check_error(code) + return handle + elif isinstance(tag_filter, AndTagFilter): + left = self._build_c_tag_filter(table_name, tag_filter.left) + right = self._build_c_tag_filter(table_name, tag_filter.right) + return tsfile_tag_filter_and(left, right) + elif isinstance(tag_filter, OrTagFilter): + left = self._build_c_tag_filter(table_name, tag_filter.left) + right = self._build_c_tag_filter(table_name, tag_filter.right) + return tsfile_tag_filter_or(left, right) + elif isinstance(tag_filter, NotTagFilter): + inner = self._build_c_tag_filter(table_name, tag_filter.filter) + return tsfile_tag_filter_not(inner) + else: + raise TypeError(f"Unknown tag filter type: {type(tag_filter)}") + def query_table_batch(self, table_name : str, column_names : List[str], start_time : int = INT64_MIN, end_time : int = INT64_MAX, batch_size : int = 1024) -> ResultSetPy:
