This is an automated email from the ASF dual-hosted git repository.
colinlee pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/tsfile.git
The following commit(s) were added to refs/heads/develop by this push:
new 7d308fe2 support tag filter. (#768)
7d308fe2 is described below
commit 7d308fe254dd10f7ae217553700b34b3f75ad186
Author: Colin Lee <[email protected]>
AuthorDate: Wed Apr 8 16:55:10 2026 +0800
support tag filter. (#768)
---
cpp/src/cwrapper/tsfile_cwrapper.cc | 128 +++++-
cpp/src/cwrapper/tsfile_cwrapper.h | 89 ++++-
cpp/src/reader/table_query_executor.cc | 5 +-
cpp/src/reader/tsfile_reader.cc | 8 +-
cpp/src/reader/tsfile_reader.h | 4 +-
cpp/test/cwrapper/query_by_row_cwrapper_test.cc | 5 +-
.../table_view/tsfile_table_query_by_row_test.cc | 45 +++
python/lower_case_name.tsfile | Bin 23089 -> 0 bytes
python/test1.tsfile | Bin 23089 -> 0 bytes
python/tests/bench_batch_arrow_vs_dataframe.py | 2 +-
python/tests/test_batch_arrow.py | 12 +-
python/tests/test_tag_filter.py | 233 +++++++++++
python/tests/test_tag_filter_query.py | 440 +++++++++++++++++++++
python/tests/test_write_arrow.py | 2 +-
python/tsfile/__init__.py | 2 +
python/tsfile/dataset/reader.py | 4 +-
python/tsfile/tag_filter.py | 151 +++++++
python/tsfile/tsfile_cpp.pxd | 55 ++-
python/tsfile/tsfile_py_cpp.pxd | 11 +-
python/tsfile/tsfile_py_cpp.pyx | 41 +-
python/tsfile/tsfile_reader.pyx | 99 ++++-
21 files changed, 1275 insertions(+), 61 deletions(-)
diff --git a/cpp/src/cwrapper/tsfile_cwrapper.cc
b/cpp/src/cwrapper/tsfile_cwrapper.cc
index e6ecef2a..e1b5617a 100644
--- a/cpp/src/cwrapper/tsfile_cwrapper.cc
+++ b/cpp/src/cwrapper/tsfile_cwrapper.cc
@@ -32,6 +32,7 @@
#include "common/statistic.h"
#include "common/tablet.h"
#include "common/tsfile_common.h"
+#include "reader/filter/tag_filter.h"
#include "reader/result_set.h"
#include "reader/table_result_set.h"
#include "reader/tsfile_reader.h"
@@ -410,11 +411,10 @@ ResultSet tsfile_reader_query_tree_by_row(TsFileReader
reader,
return result_set;
}
-ResultSet tsfile_reader_query_table_by_row(TsFileReader reader,
- const char* table_name,
- char** column_names,
- int column_names_len, int offset,
- int limit, ERRNO* err_code) {
+ResultSet tsfile_reader_query_table_by_row(
+ TsFileReader reader, const char* table_name, char** column_names,
+ int column_names_len, int offset, int limit, TagFilterHandle tag_filter,
+ int batch_size, ERRNO* err_code) {
auto* r = static_cast<storage::TsFileReader*>(reader);
storage::ResultSet* result_set = nullptr;
@@ -427,15 +427,17 @@ ResultSet tsfile_reader_query_table_by_row(TsFileReader
reader,
columns.emplace_back(name == nullptr ? "" : std::string(name));
}
- *err_code = r->queryByRow(table_name == nullptr ? "" : table_name, columns,
- offset, limit, result_set);
+ *err_code = r->queryByRow(
+ table_name == nullptr ? "" : table_name, columns, offset, limit,
+ result_set, static_cast<storage::Filter*>(tag_filter), batch_size);
return result_set;
}
ResultSet tsfile_query_table_batch(TsFileReader reader, const char* table_name,
char** columns, uint32_t column_num,
Timestamp start_time, Timestamp end_time,
- int batch_size, ERRNO* err_code) {
+ TagFilterHandle tag_filter, int batch_size,
+ ERRNO* err_code) {
auto* r = static_cast<storage::TsFileReader*>(reader);
storage::ResultSet* table_result_set = nullptr;
std::vector<std::string> column_names;
@@ -443,7 +445,8 @@ ResultSet tsfile_query_table_batch(TsFileReader reader,
const char* table_name,
column_names.emplace_back(columns[i]);
}
*err_code = r->query(table_name, column_names, start_time, end_time,
- table_result_set, batch_size);
+ table_result_set,
+ static_cast<storage::Filter*>(tag_filter),
batch_size);
return table_result_set;
}
@@ -1479,6 +1482,113 @@ ResultSet _tsfile_reader_query_device(TsFileReader
reader,
*err_code = r->query(selected_paths, start_time, end_time, qds);
return qds;
}
+
+// ---------- Tag Filter API ----------
+
+TagFilterHandle tsfile_tag_filter_create(TsFileReader reader,
+ const char* table_name,
+ const char* column_name,
+ const char* value, TagFilterOp op,
+ ERRNO* err_code) {
+ auto* r = static_cast<storage::TsFileReader*>(reader);
+ auto schema = r->get_table_schema(table_name);
+ if (!schema) {
+ *err_code = common::E_INVALID_ARG;
+ return nullptr;
+ }
+ storage::TagFilterBuilder builder(schema.get());
+ storage::Filter* filter = nullptr;
+ switch (op) {
+ case TAG_FILTER_EQ:
+ filter = builder.eq(column_name, value);
+ break;
+ case TAG_FILTER_NEQ:
+ filter = builder.neq(column_name, value);
+ break;
+ case TAG_FILTER_LT:
+ filter = builder.lt(column_name, value);
+ break;
+ case TAG_FILTER_LTEQ:
+ filter = builder.lteq(column_name, value);
+ break;
+ case TAG_FILTER_GT:
+ filter = builder.gt(column_name, value);
+ break;
+ case TAG_FILTER_GTEQ:
+ filter = builder.gteq(column_name, value);
+ break;
+ case TAG_FILTER_REGEXP:
+ filter = builder.reg_exp(column_name, value);
+ break;
+ case TAG_FILTER_NOT_REGEXP:
+ filter = builder.not_reg_exp(column_name, value);
+ break;
+ default:
+ *err_code = common::E_INVALID_ARG;
+ return nullptr;
+ }
+ *err_code = common::E_OK;
+ return static_cast<void*>(filter);
+}
+
+TagFilterHandle tsfile_tag_filter_between(TsFileReader reader,
+ const char* table_name,
+ const char* column_name,
+ const char* lower, const char* upper,
+ bool is_not, ERRNO* err_code) {
+ auto* r = static_cast<storage::TsFileReader*>(reader);
+ auto schema = r->get_table_schema(table_name);
+ if (!schema) {
+ *err_code = common::E_INVALID_ARG;
+ return nullptr;
+ }
+ storage::TagFilterBuilder builder(schema.get());
+ storage::Filter* filter =
+ is_not ? builder.not_between_and(column_name, lower, upper)
+ : builder.between_and(column_name, lower, upper);
+ *err_code = common::E_OK;
+ return static_cast<void*>(filter);
+}
+
+TagFilterHandle tsfile_tag_filter_and(TagFilterHandle left,
+ TagFilterHandle right) {
+ return static_cast<void*>(storage::TagFilterBuilder::and_filter(
+ static_cast<storage::Filter*>(left),
+ static_cast<storage::Filter*>(right)));
+}
+
+TagFilterHandle tsfile_tag_filter_or(TagFilterHandle left,
+ TagFilterHandle right) {
+ return static_cast<void*>(storage::TagFilterBuilder::or_filter(
+ static_cast<storage::Filter*>(left),
+ static_cast<storage::Filter*>(right)));
+}
+
+TagFilterHandle tsfile_tag_filter_not(TagFilterHandle filter) {
+ return static_cast<void*>(storage::TagFilterBuilder::not_filter(
+ static_cast<storage::Filter*>(filter)));
+}
+
+void tsfile_tag_filter_free(TagFilterHandle filter) {
+ delete static_cast<storage::Filter*>(filter);
+}
+
+ResultSet tsfile_query_table_with_tag_filter(
+ TsFileReader reader, const char* table_name, char** columns,
+ uint32_t column_num, Timestamp start_time, Timestamp end_time,
+ TagFilterHandle tag_filter, int batch_size, ERRNO* err_code) {
+ auto* r = static_cast<storage::TsFileReader*>(reader);
+ storage::ResultSet* table_result_set = nullptr;
+ std::vector<std::string> column_names;
+ for (uint32_t i = 0; i < column_num; i++) {
+ column_names.emplace_back(columns[i]);
+ }
+ *err_code = r->query(table_name, column_names, start_time, end_time,
+ table_result_set,
+ static_cast<storage::Filter*>(tag_filter),
batch_size);
+ return table_result_set;
+}
+
#ifdef __cplusplus
}
#endif
\ No newline at end of file
diff --git a/cpp/src/cwrapper/tsfile_cwrapper.h
b/cpp/src/cwrapper/tsfile_cwrapper.h
index 6c0e6d2c..aa7fd029 100644
--- a/cpp/src/cwrapper/tsfile_cwrapper.h
+++ b/cpp/src/cwrapper/tsfile_cwrapper.h
@@ -252,6 +252,7 @@ typedef void* Tablet;
typedef void* TsRecord;
typedef void* ResultSet;
+typedef void* TagFilterHandle;
typedef struct arrow_schema {
// Array type description
@@ -675,16 +676,16 @@ ResultSet tsfile_reader_query_tree_by_row(TsFileReader
reader,
* @param err_code [out] Error code. E_OK(0) on success.
* @return ResultSet handle on success; NULL on failure.
*/
-ResultSet tsfile_reader_query_table_by_row(TsFileReader reader,
- const char* table_name,
- char** column_names,
- int column_names_len, int offset,
- int limit, ERRNO* err_code);
+ResultSet tsfile_reader_query_table_by_row(
+ TsFileReader reader, const char* table_name, char** column_names,
+ int column_names_len, int offset, int limit, TagFilterHandle tag_filter,
+ int batch_size, ERRNO* err_code);
ResultSet tsfile_query_table_batch(TsFileReader reader, const char* table_name,
char** columns, uint32_t column_num,
Timestamp start_time, Timestamp end_time,
- int batch_size, ERRNO* err_code);
+ TagFilterHandle tag_filter, int batch_size,
+ ERRNO* err_code);
// ResultSet tsfile_reader_query_device(TsFileReader reader,
// const char* device_name,
// char** sensor_name, uint32_t
@@ -859,6 +860,82 @@ TableSchema*
tsfile_reader_get_all_table_schemas(TsFileReader reader,
DeviceSchema* tsfile_reader_get_all_timeseries_schemas(TsFileReader reader,
uint32_t* size);
+// ---------- Tag Filter API ----------
+
+/**
+ * @brief Tag filter comparison operators.
+ */
+typedef enum {
+ TAG_FILTER_EQ = 0,
+ TAG_FILTER_NEQ = 1,
+ TAG_FILTER_LT = 2,
+ TAG_FILTER_LTEQ = 3,
+ TAG_FILTER_GT = 4,
+ TAG_FILTER_GTEQ = 5,
+ TAG_FILTER_REGEXP = 6,
+ TAG_FILTER_NOT_REGEXP = 7,
+} TagFilterOp;
+
+/**
+ * @brief Create a tag filter with a comparison operator.
+ *
+ * @param reader [in] TsFileReader handle (used to resolve column name to
+ * index).
+ * @param table_name [in] Table name whose schema defines the TAG columns.
+ * @param column_name [in] Name of the TAG column to filter on.
+ * @param value [in] Comparison value (string).
+ * @param op [in] Comparison operator (TagFilterOp).
+ * @param err_code [out] Error code. E_OK(0) on success.
+ * @return TagFilterHandle on success; NULL on failure.
+ */
+TagFilterHandle tsfile_tag_filter_create(TsFileReader reader,
+ const char* table_name,
+ const char* column_name,
+ const char* value, TagFilterOp op,
+ ERRNO* err_code);
+
+/**
+ * @brief Create a BETWEEN tag filter (lower <= column <= upper).
+ */
+TagFilterHandle tsfile_tag_filter_between(TsFileReader reader,
+ const char* table_name,
+ const char* column_name,
+ const char* lower, const char* upper,
+ bool is_not, ERRNO* err_code);
+
+/**
+ * @brief Combine two tag filters with AND.
+ */
+TagFilterHandle tsfile_tag_filter_and(TagFilterHandle left,
+ TagFilterHandle right);
+
+/**
+ * @brief Combine two tag filters with OR.
+ */
+TagFilterHandle tsfile_tag_filter_or(TagFilterHandle left,
+ TagFilterHandle right);
+
+/**
+ * @brief Negate a tag filter.
+ */
+TagFilterHandle tsfile_tag_filter_not(TagFilterHandle filter);
+
+/**
+ * @brief Free a tag filter and all its children.
+ */
+void tsfile_tag_filter_free(TagFilterHandle filter);
+
+/**
+ * @brief Query table with tag filter.
+ *
+ * @param batch_size <= 0 means row-by-row return mode,
+ * > 0 means return TsBlock with the specified block size.
+ */
+ResultSet tsfile_query_table_with_tag_filter(
+ TsFileReader reader, const char* table_name, char** columns,
+ uint32_t column_num, Timestamp start_time, Timestamp end_time,
+ TagFilterHandle tag_filter, int batch_size, ERRNO* err_code);
+
// Close and free resource.
void free_tablet(Tablet* tablet);
void free_tsfile_result_set(ResultSet* result_set);
diff --git a/cpp/src/reader/table_query_executor.cc
b/cpp/src/reader/table_query_executor.cc
index c23ffc0f..d5145104 100644
--- a/cpp/src/reader/table_query_executor.cc
+++ b/cpp/src/reader/table_query_executor.cc
@@ -157,8 +157,9 @@ int TableQueryExecutor::query(const std::string& table_name,
return common::E_UNSUPPORTED_ORDER;
}
assert(tsblock_reader != nullptr);
- ret_qds = new TableResultSet(std::move(tsblock_reader),
- lower_case_column_names, data_types);
+ ret_qds =
+ new TableResultSet(std::move(tsblock_reader), lower_case_column_names,
+ data_types, return_mode_);
return ret;
}
diff --git a/cpp/src/reader/tsfile_reader.cc b/cpp/src/reader/tsfile_reader.cc
index 196189f0..ea7b2cc0 100644
--- a/cpp/src/reader/tsfile_reader.cc
+++ b/cpp/src/reader/tsfile_reader.cc
@@ -135,7 +135,8 @@ int TsFileReader::queryByRow(std::vector<std::string>&
path_list, int offset,
int TsFileReader::queryByRow(const std::string& table_name,
const std::vector<std::string>& column_names,
- int offset, int limit, ResultSet*& result_set) {
+ int offset, int limit, ResultSet*& result_set,
+ Filter* tag_filter, int batch_size) {
int ret = E_OK;
TsFileMeta* tsfile_meta = tsfile_executor_->get_tsfile_meta();
if (tsfile_meta == nullptr) {
@@ -147,11 +148,10 @@ int TsFileReader::queryByRow(const std::string&
table_name,
}
if (table_query_executor_ == nullptr) {
- table_query_executor_ = new TableQueryExecutor(read_file_);
+ table_query_executor_ = new TableQueryExecutor(read_file_, batch_size);
}
ret = table_query_executor_->query(to_lower(table_name), column_names,
- /*time_filter=*/nullptr,
- /*tag_filter=*/nullptr,
+ /*time_filter=*/nullptr, tag_filter,
/*field_filter=*/nullptr, offset, limit,
result_set);
return ret;
diff --git a/cpp/src/reader/tsfile_reader.h b/cpp/src/reader/tsfile_reader.h
index 324d202d..e78a38ac 100644
--- a/cpp/src/reader/tsfile_reader.h
+++ b/cpp/src/reader/tsfile_reader.h
@@ -143,11 +143,13 @@ class TsFileReader {
* @param offset Number of leading rows to skip (>= 0).
* @param limit Maximum rows to return. < 0 means unlimited.
* @param[out] result_set The result set containing query results.
+ * @param tag_filter Optional tag filter for filtering by tag columns.
* @return Returns 0 on success, or a non-zero error code on failure.
*/
int queryByRow(const std::string& table_name,
const std::vector<std::string>& column_names, int offset,
- int limit, ResultSet*& result_set);
+ int limit, ResultSet*& result_set,
+ Filter* tag_filter = nullptr, int batch_size = 0);
int query_table_on_tree(const std::vector<std::string>& measurement_names,
int64_t star_time, int64_t end_time,
diff --git a/cpp/test/cwrapper/query_by_row_cwrapper_test.cc
b/cpp/test/cwrapper/query_by_row_cwrapper_test.cc
index a8462597..3de447ff 100644
--- a/cpp/test/cwrapper/query_by_row_cwrapper_test.cc
+++ b/cpp/test/cwrapper/query_by_row_cwrapper_test.cc
@@ -215,8 +215,9 @@ TEST_F(CWrapperQueryByRowTest, TableByRowOffsetLimit) {
const int offset = 3;
const int limit = 5;
- ResultSet rs = tsfile_reader_query_table_by_row(
- reader, table_name.c_str(), column_names_c, 2, offset, limit, &code);
+ ResultSet rs = tsfile_reader_query_table_by_row(reader, table_name.c_str(),
+ column_names_c, 2, offset,
+ limit, NULL, 0, &code);
ASSERT_EQ(code, RET_OK);
ASSERT_NE(rs, nullptr);
diff --git a/cpp/test/reader/table_view/tsfile_table_query_by_row_test.cc
b/cpp/test/reader/table_view/tsfile_table_query_by_row_test.cc
index 71440208..026f75b2 100644
--- a/cpp/test/reader/table_view/tsfile_table_query_by_row_test.cc
+++ b/cpp/test/reader/table_view/tsfile_table_query_by_row_test.cc
@@ -27,6 +27,7 @@
#include "common/schema.h"
#include "common/tablet.h"
#include "file/write_file.h"
+#include "reader/filter/tag_filter.h"
#include "reader/table_result_set.h"
#include "reader/tsfile_reader.h"
#include "writer/tsfile_table_writer.h"
@@ -724,3 +725,47 @@ TEST_F(TableQueryByRowTest,
DISABLED_QueryByRowFasterThanManualNext) {
"(min_by_row="
<< min_by_row << " ms, min_manual=" << min_manual << " ms)";
}
+
+// queryByRow with tag filter: only rows matching the tag predicate are
+// returned.
+TEST_F(TableQueryByRowTest, TagFilterEq) {
+ int rows_per_device = 20;
+ int device_count = 3;
+ write_multi_device_file(rows_per_device, device_count);
+
+ // Reconstruct the same schema used by write_multi_device_file.
+ std::vector<ColumnSchema> col_schemas = {
+ ColumnSchema("id1", TSDataType::STRING, CompressionType::UNCOMPRESSED,
+ TSEncoding::PLAIN, ColumnCategory::TAG),
+ ColumnSchema("s1", TSDataType::INT64, CompressionType::UNCOMPRESSED,
+ TSEncoding::PLAIN, ColumnCategory::FIELD),
+ };
+ TableSchema schema("t1", col_schemas);
+
+ // Build tag filter: id1 == "dev1"
+ TagFilterBuilder builder(&schema);
+ Filter* tag_filter = builder.eq("id1", "dev1");
+
+ TsFileReader reader;
+ ASSERT_EQ(reader.open(file_name_), E_OK);
+
+ ResultSet* rs = nullptr;
+ ASSERT_EQ(reader.queryByRow("t1", {"id1", "s1"}, 0, -1, rs, tag_filter),
+ E_OK);
+ ASSERT_NE(rs, nullptr);
+
+ std::vector<int64_t> filtered_s1;
+ bool has_next = false;
+ while (IS_SUCC(rs->next(has_next)) && has_next) {
+ filtered_s1.push_back(rs->get_value<int64_t>("s1"));
+ }
+ reader.destroy_query_data_set(rs);
+ reader.close();
+ delete tag_filter;
+
+ // dev1 has rows_per_device rows with s1 = 1*1000+t for t in [0,20).
+ ASSERT_EQ(filtered_s1.size(), static_cast<size_t>(rows_per_device));
+ for (int t = 0; t < rows_per_device; t++) {
+ EXPECT_EQ(filtered_s1[t], static_cast<int64_t>(1 * 1000 + t));
+ }
+}
diff --git a/python/lower_case_name.tsfile b/python/lower_case_name.tsfile
deleted file mode 100644
index d4717671..00000000
Binary files a/python/lower_case_name.tsfile and /dev/null differ
diff --git a/python/test1.tsfile b/python/test1.tsfile
deleted file mode 100644
index 1141f08d..00000000
Binary files a/python/test1.tsfile and /dev/null differ
diff --git a/python/tests/bench_batch_arrow_vs_dataframe.py
b/python/tests/bench_batch_arrow_vs_dataframe.py
index e1f6c421..0e34347f 100644
--- a/python/tests/bench_batch_arrow_vs_dataframe.py
+++ b/python/tests/bench_batch_arrow_vs_dataframe.py
@@ -83,7 +83,7 @@ def _ensure_bench_tsfile(file_path: str, row_count: int) ->
None:
def _read_via_arrow(file_path: str, batch_size: int, end_time: int) -> int:
"""Read all rows using query_table_batch + read_arrow_batch. Returns total
rows."""
reader = TsFileReader(file_path)
- result_set = reader.query_table_batch(
+ result_set = reader.query_table(
table_name=TABLE_NAME,
column_names=COLUMNS,
start_time=0,
diff --git a/python/tests/test_batch_arrow.py b/python/tests/test_batch_arrow.py
index 75bd6c4c..9f144389 100644
--- a/python/tests/test_batch_arrow.py
+++ b/python/tests/test_batch_arrow.py
@@ -62,7 +62,7 @@ def test_batch_read_arrow_basic():
pytest.skip("pyarrow is not installed")
reader = TsFileReader(file_path)
- result_set = reader.query_table_batch(
+ result_set = reader.query_table(
table_name="test_table",
column_names=["device", "value1", "value2"],
start_time=0,
@@ -135,7 +135,7 @@ def test_batch_read_arrow_compare_with_dataframe():
pytest.skip("pyarrow is not installed")
reader1 = TsFileReader(file_path)
- result_set1 = reader1.query_table_batch(
+ result_set1 = reader1.query_table(
table_name="test_table",
column_names=["device", "value1", "value2", "value3"],
start_time=0,
@@ -224,7 +224,7 @@ def test_batch_read_arrow_empty_result():
pytest.skip("pyarrow is not installed")
reader = TsFileReader(file_path)
- result_set = reader.query_table_batch(
+ result_set = reader.query_table(
table_name="test_table",
column_names=["device", "value"],
start_time=1000,
@@ -276,7 +276,7 @@ def test_batch_read_arrow_time_range():
pytest.skip("pyarrow is not installed")
reader = TsFileReader(file_path)
- result_set = reader.query_table_batch(
+ result_set = reader.query_table(
table_name="test_table",
column_names=["device", "value"],
start_time=100,
@@ -357,7 +357,7 @@ def test_batch_read_arrow_all_datatypes():
pytest.skip("pyarrow is not installed")
reader = TsFileReader(file_path)
- result_set = reader.query_table_batch(
+ result_set = reader.query_table(
table_name="test_table",
column_names=["device", "bool_val", "int32_val", "int64_val",
"float_val", "double_val", "string_val", "date_val"],
start_time=0,
@@ -421,7 +421,7 @@ def test_batch_read_arrow_no_pyarrow():
writer.write_table(tablet)
reader = TsFileReader(file_path)
- result_set = reader.query_table_batch(
+ result_set = reader.query_table(
table_name="test_table",
column_names=["device", "value"],
start_time=0,
diff --git a/python/tests/test_tag_filter.py b/python/tests/test_tag_filter.py
new file mode 100644
index 00000000..69f48af5
--- /dev/null
+++ b/python/tests/test_tag_filter.py
@@ -0,0 +1,233 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import os
+
+import pytest
+
+from tsfile import (
+ ColumnSchema, TableSchema, TSDataType, ColumnCategory,
+ TsFileTableWriter, TsFileReader, Tablet,
+ tag_eq, tag_neq, tag_lt, tag_lteq, tag_gt, tag_gteq,
+ tag_regexp, tag_not_regexp, tag_between, tag_not_between,
+)
+
+TSFILE_PATH = "test_tag_filter.tsfile"
+
+# Schema: table "sensors" with TAG columns "region" and "device",
+# and FIELD column "value" (DOUBLE).
+TABLE_NAME = "sensors"
+COLUMNS = [
+ ColumnSchema("region", TSDataType.STRING, ColumnCategory.TAG),
+ ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG),
+ ColumnSchema("value", TSDataType.DOUBLE, ColumnCategory.FIELD),
+]
+SCHEMA = TableSchema(TABLE_NAME, COLUMNS)
+
+# Test data:
+# region | device | timestamps | values
+# north | dev_a | 0..4 | 0.0..4.0
+# north | dev_b | 5..9 | 5.0..9.0
+# south | dev_c | 10..14 | 10.0..14.0
+# east | dev_d | 15..19 | 15.0..19.0
+DEVICES = [
+ ("north", "dev_a", 0, 5),
+ ("north", "dev_b", 5, 10),
+ ("south", "dev_c", 10, 15),
+ ("east", "dev_d", 15, 20),
+]
+
+
[email protected](scope="module", autouse=True)
+def create_tsfile():
+ """Write the test TsFile once for all tests in this module."""
+ if os.path.exists(TSFILE_PATH):
+ os.remove(TSFILE_PATH)
+
+ with TsFileTableWriter(TSFILE_PATH, SCHEMA) as writer:
+ for region, device, start, end in DEVICES:
+ count = end - start
+ tablet = Tablet(
+ ["region", "device", "value"],
+ [TSDataType.STRING, TSDataType.STRING, TSDataType.DOUBLE],
+ count,
+ )
+ for i in range(count):
+ ts = start + i
+ tablet.add_timestamp(i, ts)
+ tablet.add_value_by_name("region", i, region)
+ tablet.add_value_by_name("device", i, device)
+ tablet.add_value_by_name("value", i, float(ts))
+ writer.write_table(tablet)
+
+ yield
+
+ if os.path.exists(TSFILE_PATH):
+ os.remove(TSFILE_PATH)
+
+
+def _query_values(reader, tag_filter):
+ """Helper: query all columns with the given tag_filter, return list of
(region, device, value) tuples."""
+ result = reader.query_table(TABLE_NAME, ["region", "device", "value"],
tag_filter=tag_filter)
+ rows = []
+ while result.next():
+ region = result.get_value_by_name("region")
+ device = result.get_value_by_name("device")
+ value = result.get_value_by_name("value")
+ rows.append((region, device, value))
+ result.close()
+ return rows
+
+
+def test_tag_eq():
+ with TsFileReader(TSFILE_PATH) as reader:
+ rows = _query_values(reader, tag_eq("region", "north"))
+ assert len(rows) == 10 # dev_a (5) + dev_b (5)
+ assert all(r[0] == "north" for r in rows)
+
+
+def test_tag_neq():
+ with TsFileReader(TSFILE_PATH) as reader:
+ rows = _query_values(reader, tag_neq("region", "north"))
+ assert len(rows) == 10 # south (5) + east (5)
+ assert all(r[0] != "north" for r in rows)
+
+
+def test_tag_eq_device():
+ with TsFileReader(TSFILE_PATH) as reader:
+ rows = _query_values(reader, tag_eq("device", "dev_c"))
+ assert len(rows) == 5
+ assert all(r[1] == "dev_c" for r in rows)
+ assert all(r[0] == "south" for r in rows)
+
+
+def test_tag_lt():
+ with TsFileReader(TSFILE_PATH) as reader:
+ # Lexicographic: "east" < "north" < "south"
+ rows = _query_values(reader, tag_lt("region", "north"))
+ assert len(rows) == 5 # only "east"
+ assert all(r[0] == "east" for r in rows)
+
+
+def test_tag_lteq():
+ with TsFileReader(TSFILE_PATH) as reader:
+ rows = _query_values(reader, tag_lteq("region", "north"))
+ assert len(rows) == 15 # "east" (5) + "north" (10)
+
+
+def test_tag_gt():
+ with TsFileReader(TSFILE_PATH) as reader:
+ rows = _query_values(reader, tag_gt("region", "north"))
+ assert len(rows) == 5 # only "south"
+ assert all(r[0] == "south" for r in rows)
+
+
+def test_tag_gteq():
+ with TsFileReader(TSFILE_PATH) as reader:
+ rows = _query_values(reader, tag_gteq("region", "north"))
+ assert len(rows) == 15 # "north" (10) + "south" (5)
+
+
+def test_tag_between():
+ with TsFileReader(TSFILE_PATH) as reader:
+ # Between "east" and "north" (inclusive)
+ rows = _query_values(reader, tag_between("region", "east", "north"))
+ assert len(rows) == 15 # "east" (5) + "north" (10)
+
+
+def test_tag_not_between():
+ with TsFileReader(TSFILE_PATH) as reader:
+ rows = _query_values(reader, tag_not_between("region", "east",
"north"))
+ assert len(rows) == 5 # only "south"
+ assert all(r[0] == "south" for r in rows)
+
+
+def test_tag_regexp():
+ with TsFileReader(TSFILE_PATH) as reader:
+ # Match regions starting with 'n' or 's'
+ rows = _query_values(reader, tag_regexp("region", "^[ns]"))
+ assert len(rows) == 15 # "north" (10) + "south" (5)
+
+
+def test_tag_not_regexp():
+ with TsFileReader(TSFILE_PATH) as reader:
+ rows = _query_values(reader, tag_not_regexp("region", "^[ns]"))
+ assert len(rows) == 5 # only "east"
+ assert all(r[0] == "east" for r in rows)
+
+
+def test_tag_and():
+ with TsFileReader(TSFILE_PATH) as reader:
+ f = tag_eq("region", "north") & tag_eq("device", "dev_a")
+ rows = _query_values(reader, f)
+ assert len(rows) == 5
+ assert all(r[0] == "north" and r[1] == "dev_a" for r in rows)
+
+
+def test_tag_or():
+ with TsFileReader(TSFILE_PATH) as reader:
+ f = tag_eq("region", "south") | tag_eq("region", "east")
+ rows = _query_values(reader, f)
+ assert len(rows) == 10
+ regions = {r[0] for r in rows}
+ assert regions == {"south", "east"}
+
+
+def test_tag_not():
+ with TsFileReader(TSFILE_PATH) as reader:
+ f = ~tag_eq("region", "north")
+ rows = _query_values(reader, f)
+ assert len(rows) == 10 # south + east
+ assert all(r[0] != "north" for r in rows)
+
+
+def test_tag_complex_combination():
+ with TsFileReader(TSFILE_PATH) as reader:
+ # (region == "north" AND device == "dev_b") OR region == "east"
+ f = (tag_eq("region", "north") & tag_eq("device", "dev_b")) |
tag_eq("region", "east")
+ rows = _query_values(reader, f)
+ assert len(rows) == 10 # dev_b (5) + east (5)
+ for r in rows:
+ assert (r[0] == "north" and r[1] == "dev_b") or r[0] == "east"
+
+
+def test_no_tag_filter():
+ """Verify that query_table without tag_filter still works (backward
compat)."""
+ with TsFileReader(TSFILE_PATH) as reader:
+ result = reader.query_table(TABLE_NAME, ["region", "device", "value"])
+ rows = []
+ while result.next():
+ rows.append(result.get_value_by_name("region"))
+ result.close()
+ assert len(rows) == 20
+
+
+def test_tag_filter_with_time_range():
+ """Tag filter combined with time range."""
+ with TsFileReader(TSFILE_PATH) as reader:
+ result = reader.query_table(
+ TABLE_NAME, ["region", "device", "value"],
+ start_time=0, end_time=7,
+ tag_filter=tag_eq("region", "north"),
+ )
+ rows = []
+ while result.next():
+ rows.append(result.get_value_by_name("value"))
+ result.close()
+ # north has timestamps 0..9, but time range 0..7 gives 8 rows
+ assert len(rows) == 8
diff --git a/python/tests/test_tag_filter_query.py
b/python/tests/test_tag_filter_query.py
new file mode 100644
index 00000000..513fd0c4
--- /dev/null
+++ b/python/tests/test_tag_filter_query.py
@@ -0,0 +1,440 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import os
+
+import pyarrow as pa
+import pytest
+
+from tsfile import (
+ ColumnSchema, TableSchema, TSDataType, ColumnCategory,
+ TsFileTableWriter, TsFileReader, Tablet,
+ tag_eq, tag_gteq, TIME_COLUMN,
+)
+
+TSFILE_PATH = "test_tag_filter_query.tsfile"
+
+TABLE_NAME = "sensors"
+COLUMNS = [
+ ColumnSchema("region", TSDataType.STRING, ColumnCategory.TAG),
+ ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG),
+ ColumnSchema("value", TSDataType.DOUBLE, ColumnCategory.FIELD),
+]
+SCHEMA = TableSchema(TABLE_NAME, COLUMNS)
+
+# Test data:
+# region | device | timestamps | values
+# north | dev_a | 0..4 | 0.0..4.0
+# north | dev_b | 5..9 | 5.0..9.0
+# south | dev_c | 10..14 | 10.0..14.0
+# east | dev_d | 15..19 | 15.0..19.0
+DEVICES = [
+ ("north", "dev_a", 0, 5),
+ ("north", "dev_b", 5, 10),
+ ("south", "dev_c", 10, 15),
+ ("east", "dev_d", 15, 20),
+]
+
+
[email protected](scope="module", autouse=True)
+def create_tsfile():
+ if os.path.exists(TSFILE_PATH):
+ os.remove(TSFILE_PATH)
+
+ with TsFileTableWriter(TSFILE_PATH, SCHEMA) as writer:
+ for region, device, start, end in DEVICES:
+ count = end - start
+ tablet = Tablet(
+ ["region", "device", "value"],
+ [TSDataType.STRING, TSDataType.STRING, TSDataType.DOUBLE],
+ count,
+ )
+ for i in range(count):
+ ts = start + i
+ tablet.add_timestamp(i, ts)
+ tablet.add_value_by_name("region", i, region)
+ tablet.add_value_by_name("device", i, device)
+ tablet.add_value_by_name("value", i, float(ts))
+ writer.write_table(tablet)
+
+ yield
+
+ if os.path.exists(TSFILE_PATH):
+ os.remove(TSFILE_PATH)
+
+
+# ---------------------------------------------------------------------------
+# Helper: collect all rows from scalar (row-by-row) iteration
+# ---------------------------------------------------------------------------
+def _scalar_rows(result):
+ rows = []
+ while result.next():
+ rows.append((
+ result.get_value_by_name("region"),
+ result.get_value_by_name("device"),
+ result.get_value_by_name("value"),
+ ))
+ return rows
+
+
+# ---------------------------------------------------------------------------
+# Helper: collect all rows from arrow batch iteration
+# ---------------------------------------------------------------------------
+def _arrow_rows(result):
+ tables = []
+ while True:
+ batch = result.read_arrow_batch()
+ if batch is None:
+ break
+ tables.append(batch)
+ if not tables:
+ return []
+ combined = pa.concat_tables(tables)
+ rows = []
+ for i in range(combined.num_rows):
+ rows.append((
+ combined.column("region")[i].as_py(),
+ combined.column("device")[i].as_py(),
+ combined.column("value")[i].as_py(),
+ ))
+ return rows
+
+
+# ===========================================================================
+# query_table with tag_filter — scalar mode
+# ===========================================================================
+class TestQueryTableTagFilterScalar:
+
+ def test_eq_filter(self):
+ with TsFileReader(TSFILE_PATH) as reader:
+ with reader.query_table(
+ TABLE_NAME, ["region", "device", "value"],
+ tag_filter=tag_eq("region", "north"),
+ ) as result:
+ rows = _scalar_rows(result)
+ assert len(rows) == 10
+ assert all(r[0] == "north" for r in rows)
+
+ def test_and_filter(self):
+ with TsFileReader(TSFILE_PATH) as reader:
+ f = tag_eq("region", "north") & tag_eq("device", "dev_a")
+ with reader.query_table(
+ TABLE_NAME, ["region", "device", "value"],
+ tag_filter=f,
+ ) as result:
+ rows = _scalar_rows(result)
+ assert len(rows) == 5
+ assert all(r[0] == "north" and r[1] == "dev_a" for r in rows)
+
+ def test_or_filter(self):
+ with TsFileReader(TSFILE_PATH) as reader:
+ f = tag_eq("region", "south") | tag_eq("region", "east")
+ with reader.query_table(
+ TABLE_NAME, ["region", "device", "value"],
+ tag_filter=f,
+ ) as result:
+ rows = _scalar_rows(result)
+ assert len(rows) == 10
+ assert {r[0] for r in rows} == {"south", "east"}
+
+ def test_with_time_range(self):
+ with TsFileReader(TSFILE_PATH) as reader:
+ with reader.query_table(
+ TABLE_NAME, ["region", "device", "value"],
+ start_time=0, end_time=7,
+ tag_filter=tag_eq("region", "north"),
+ ) as result:
+ rows = _scalar_rows(result)
+ assert len(rows) == 8
+ assert all(r[0] == "north" for r in rows)
+
+ def test_no_match(self):
+ with TsFileReader(TSFILE_PATH) as reader:
+ with reader.query_table(
+ TABLE_NAME, ["region", "device", "value"],
+ tag_filter=tag_eq("region", "west"),
+ ) as result:
+ rows = _scalar_rows(result)
+ assert len(rows) == 0
+
+
+# ===========================================================================
+# query_table with tag_filter — arrow batch mode
+# ===========================================================================
+class TestQueryTableTagFilterArrow:
+
+ def test_eq_filter(self):
+ with TsFileReader(TSFILE_PATH) as reader:
+ with reader.query_table(
+ TABLE_NAME, ["region", "device", "value"],
+ tag_filter=tag_eq("region", "north"),
+ batch_size=1024,
+ ) as result:
+ rows = _arrow_rows(result)
+ assert len(rows) == 10
+ assert all(r[0] == "north" for r in rows)
+
+ def test_and_filter(self):
+ with TsFileReader(TSFILE_PATH) as reader:
+ f = tag_eq("region", "north") & tag_eq("device", "dev_b")
+ with reader.query_table(
+ TABLE_NAME, ["region", "device", "value"],
+ tag_filter=f,
+ batch_size=1024,
+ ) as result:
+ rows = _arrow_rows(result)
+ assert len(rows) == 5
+ assert all(r[0] == "north" and r[1] == "dev_b" for r in rows)
+
+ def test_or_filter(self):
+ with TsFileReader(TSFILE_PATH) as reader:
+ f = tag_eq("region", "south") | tag_eq("region", "east")
+ with reader.query_table(
+ TABLE_NAME, ["region", "device", "value"],
+ tag_filter=f,
+ batch_size=1024,
+ ) as result:
+ rows = _arrow_rows(result)
+ assert len(rows) == 10
+ assert {r[0] for r in rows} == {"south", "east"}
+
+ def test_with_time_range(self):
+ with TsFileReader(TSFILE_PATH) as reader:
+ with reader.query_table(
+ TABLE_NAME, ["region", "device", "value"],
+ start_time=0, end_time=7,
+ tag_filter=tag_eq("region", "north"),
+ batch_size=1024,
+ ) as result:
+ rows = _arrow_rows(result)
+ assert len(rows) == 8
+
+ def test_small_batch_size(self):
+ with TsFileReader(TSFILE_PATH) as reader:
+ with reader.query_table(
+ TABLE_NAME, ["region", "device", "value"],
+ tag_filter=tag_eq("region", "north"),
+ batch_size=3,
+ ) as result:
+ rows = _arrow_rows(result)
+ assert len(rows) == 10
+ assert all(r[0] == "north" for r in rows)
+
+ def test_no_match(self):
+ with TsFileReader(TSFILE_PATH) as reader:
+ with reader.query_table(
+ TABLE_NAME, ["region", "device", "value"],
+ tag_filter=tag_eq("region", "west"),
+ batch_size=1024,
+ ) as result:
+ rows = _arrow_rows(result)
+ assert len(rows) == 0
+
+
+# ===========================================================================
+# query_table_by_row with tag_filter — scalar mode
+# ===========================================================================
+class TestQueryTableByRowTagFilterScalar:
+
+ def test_eq_filter(self):
+ with TsFileReader(TSFILE_PATH) as reader:
+ with reader.query_table_by_row(
+ TABLE_NAME, ["region", "device", "value"],
+ tag_filter=tag_eq("region", "north"),
+ ) as result:
+ rows = _scalar_rows(result)
+ assert len(rows) == 10
+ assert all(r[0] == "north" for r in rows)
+
+ def test_and_filter(self):
+ with TsFileReader(TSFILE_PATH) as reader:
+ f = tag_eq("region", "north") & tag_eq("device", "dev_a")
+ with reader.query_table_by_row(
+ TABLE_NAME, ["region", "device", "value"],
+ tag_filter=f,
+ ) as result:
+ rows = _scalar_rows(result)
+ assert len(rows) == 5
+ assert all(r[0] == "north" and r[1] == "dev_a" for r in rows)
+
+ def test_or_filter(self):
+ with TsFileReader(TSFILE_PATH) as reader:
+ f = tag_eq("region", "south") | tag_eq("region", "east")
+ with reader.query_table_by_row(
+ TABLE_NAME, ["region", "device", "value"],
+ tag_filter=f,
+ ) as result:
+ rows = _scalar_rows(result)
+ assert len(rows) == 10
+ assert {r[0] for r in rows} == {"south", "east"}
+
+ def test_with_offset_limit(self):
+ with TsFileReader(TSFILE_PATH) as reader:
+ with reader.query_table_by_row(
+ TABLE_NAME, ["region", "device", "value"],
+ offset=2, limit=3,
+ tag_filter=tag_eq("region", "north"),
+ ) as result:
+ rows = _scalar_rows(result)
+ assert len(rows) == 3
+
+ def test_gteq_filter(self):
+ with TsFileReader(TSFILE_PATH) as reader:
+ with reader.query_table_by_row(
+ TABLE_NAME, ["region", "device", "value"],
+ tag_filter=tag_gteq("region", "north"),
+ ) as result:
+ rows = _scalar_rows(result)
+ # "north" (10) + "south" (5) = 15
+ assert len(rows) == 15
+ assert all(r[0] in ("north", "south") for r in rows)
+
+ def test_no_match(self):
+ with TsFileReader(TSFILE_PATH) as reader:
+ with reader.query_table_by_row(
+ TABLE_NAME, ["region", "device", "value"],
+ tag_filter=tag_eq("region", "west"),
+ ) as result:
+ rows = _scalar_rows(result)
+ assert len(rows) == 0
+
+
+# ===========================================================================
+# query_table_by_row with tag_filter — arrow batch mode
+# ===========================================================================
+class TestQueryTableByRowTagFilterArrow:
+
+ def test_eq_filter(self):
+ with TsFileReader(TSFILE_PATH) as reader:
+ with reader.query_table_by_row(
+ TABLE_NAME, ["region", "device", "value"],
+ tag_filter=tag_eq("region", "north"),
+ batch_size=1024,
+ ) as result:
+ rows = _arrow_rows(result)
+ assert len(rows) == 10
+ assert all(r[0] == "north" for r in rows)
+
+ def test_and_filter(self):
+ with TsFileReader(TSFILE_PATH) as reader:
+ f = tag_eq("region", "north") & tag_eq("device", "dev_b")
+ with reader.query_table_by_row(
+ TABLE_NAME, ["region", "device", "value"],
+ tag_filter=f,
+ batch_size=1024,
+ ) as result:
+ rows = _arrow_rows(result)
+ assert len(rows) == 5
+ assert all(r[0] == "north" and r[1] == "dev_b" for r in rows)
+
+ def test_or_filter(self):
+ with TsFileReader(TSFILE_PATH) as reader:
+ f = tag_eq("region", "south") | tag_eq("region", "east")
+ with reader.query_table_by_row(
+ TABLE_NAME, ["region", "device", "value"],
+ tag_filter=f,
+ batch_size=1024,
+ ) as result:
+ rows = _arrow_rows(result)
+ assert len(rows) == 10
+ assert {r[0] for r in rows} == {"south", "east"}
+
+ def test_with_offset_limit(self):
+ with TsFileReader(TSFILE_PATH) as reader:
+ with reader.query_table_by_row(
+ TABLE_NAME, ["region", "device", "value"],
+ offset=0, limit=6,
+ tag_filter=tag_eq("region", "north"),
+ batch_size=1024,
+ ) as result:
+ rows = _arrow_rows(result)
+ assert len(rows) == 6
+ assert all(r[0] == "north" for r in rows)
+
+ def test_small_batch_size(self):
+ with TsFileReader(TSFILE_PATH) as reader:
+ with reader.query_table_by_row(
+ TABLE_NAME, ["region", "device", "value"],
+ tag_filter=tag_eq("region", "south"),
+ batch_size=2,
+ ) as result:
+ rows = _arrow_rows(result)
+ assert len(rows) == 5
+ assert all(r[0] == "south" for r in rows)
+
+ def test_no_match(self):
+ with TsFileReader(TSFILE_PATH) as reader:
+ with reader.query_table_by_row(
+ TABLE_NAME, ["region", "device", "value"],
+ tag_filter=tag_eq("region", "west"),
+ batch_size=1024,
+ ) as result:
+ rows = _arrow_rows(result)
+ assert len(rows) == 0
+
+
+# ===========================================================================
+# Cross-check: scalar vs arrow return same data
+# ===========================================================================
+class TestScalarArrowConsistency:
+
+ def test_query_table_scalar_vs_arrow(self):
+ f = tag_eq("region", "north") & tag_eq("device", "dev_a")
+ with TsFileReader(TSFILE_PATH) as reader:
+ with reader.query_table(
+ TABLE_NAME, ["region", "device", "value"],
+ tag_filter=f,
+ ) as result:
+ scalar_rows = _scalar_rows(result)
+
+ with TsFileReader(TSFILE_PATH) as reader:
+ with reader.query_table(
+ TABLE_NAME, ["region", "device", "value"],
+ tag_filter=f,
+ batch_size=1024,
+ ) as result:
+ arrow_rows = _arrow_rows(result)
+
+ assert len(scalar_rows) == len(arrow_rows)
+ for s, a in zip(sorted(scalar_rows), sorted(arrow_rows)):
+ assert s[0] == a[0]
+ assert s[1] == a[1]
+ assert s[2] == pytest.approx(a[2])
+
+ def test_query_table_by_row_scalar_vs_arrow(self):
+ f = tag_eq("region", "south")
+ with TsFileReader(TSFILE_PATH) as reader:
+ with reader.query_table_by_row(
+ TABLE_NAME, ["region", "device", "value"],
+ tag_filter=f,
+ ) as result:
+ scalar_rows = _scalar_rows(result)
+
+ with TsFileReader(TSFILE_PATH) as reader:
+ with reader.query_table_by_row(
+ TABLE_NAME, ["region", "device", "value"],
+ tag_filter=f,
+ batch_size=1024,
+ ) as result:
+ arrow_rows = _arrow_rows(result)
+
+ assert len(scalar_rows) == len(arrow_rows)
+ for s, a in zip(sorted(scalar_rows), sorted(arrow_rows)):
+ assert s[0] == a[0]
+ assert s[1] == a[1]
+ assert s[2] == pytest.approx(a[2])
diff --git a/python/tests/test_write_arrow.py b/python/tests/test_write_arrow.py
index 19c8abc2..5621d22e 100644
--- a/python/tests/test_write_arrow.py
+++ b/python/tests/test_write_arrow.py
@@ -48,7 +48,7 @@ def _make_schema(table_name, extra_cols):
def _read_all_arrow(file_path, table_name, columns, start=0, end=10**18,
batch_size=4096):
"""Read all rows from file via read_arrow_batch and return as a
pa.Table."""
reader = TsFileReader(file_path)
- rs = reader.query_table_batch(
+ rs = reader.query_table(
table_name=table_name,
column_names=columns,
start_time=start,
diff --git a/python/tsfile/__init__.py b/python/tsfile/__init__.py
index 84ca330e..31390d9c 100644
--- a/python/tsfile/__init__.py
+++ b/python/tsfile/__init__.py
@@ -37,6 +37,8 @@ from .field import *
from .date_utils import *
from .exceptions import *
from .tsfile_reader import TsFileReaderPy as TsFileReader, ResultSetPy as
ResultSet
+from .tag_filter import (TagFilter, tag_eq, tag_neq, tag_lt, tag_lteq, tag_gt,
tag_gteq,
+ tag_regexp, tag_not_regexp, tag_between,
tag_not_between)
from .tsfile_writer import TsFileWriterPy as TsFileWriter
from .tsfile_py_cpp import get_tsfile_config, set_tsfile_config
from .tsfile_table_writer import TsFileTableWriter
diff --git a/python/tsfile/dataset/reader.py b/python/tsfile/dataset/reader.py
index 365dc884..d7358023 100644
--- a/python/tsfile/dataset/reader.py
+++ b/python/tsfile/dataset/reader.py
@@ -144,7 +144,7 @@ class TsFileSeriesReader:
# [Temporary] It will be replaced by new tsfile api, we won't
query all the data later.
query_columns = tag_columns + field_columns
- with self._reader.query_table_batch(table_name, query_columns,
batch_size=65536) as result_set:
+ with self._reader.query_table(table_name, query_columns,
batch_size=65536) as result_set:
while True:
arrow_table = result_set.read_arrow_batch()
if arrow_table is None:
@@ -310,7 +310,7 @@ class TsFileSeriesReader:
timestamp_parts = []
field_parts = {field_column: [] for field_column in field_columns}
- with self._reader.query_table_batch(
+ with self._reader.query_table(
table_name,
query_columns,
start_time=start_time,
diff --git a/python/tsfile/tag_filter.py b/python/tsfile/tag_filter.py
new file mode 100644
index 00000000..e5107263
--- /dev/null
+++ b/python/tsfile/tag_filter.py
@@ -0,0 +1,151 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+
+class TagFilter:
+ """Base class for tag filters used in table queries."""
+
+ def __and__(self, other):
+ return AndTagFilter(self, other)
+
+ def __or__(self, other):
+ return OrTagFilter(self, other)
+
+ def __invert__(self):
+ return NotTagFilter(self)
+
+
+class ComparisonTagFilter(TagFilter):
+ """A tag filter comparing a column to a value with a given operator."""
+
+ # Operator constants matching TagFilterOp enum in C
+ EQ = 0
+ NEQ = 1
+ LT = 2
+ LTEQ = 3
+ GT = 4
+ GTEQ = 5
+ REGEXP = 6
+ NOT_REGEXP = 7
+
+ def __init__(self, column_name: str, value: str, op: int):
+ self.column_name = column_name
+ self.value = value
+ self.op = op
+
+ def __repr__(self):
+ op_names = {0: "==", 1: "!=", 2: "<", 3: "<=", 4: ">", 5: ">=",
+ 6: "=~", 7: "!~"}
+ return f"TagFilter({self.column_name} {op_names.get(self.op, '?')}
{self.value!r})"
+
+
+class BetweenTagFilter(TagFilter):
+ """A tag filter for range queries."""
+
+ def __init__(self, column_name: str, lower: str, upper: str, is_not: bool
= False):
+ self.column_name = column_name
+ self.lower = lower
+ self.upper = upper
+ self.is_not = is_not
+
+ def __repr__(self):
+ op = "NOT BETWEEN" if self.is_not else "BETWEEN"
+ return f"TagFilter({self.column_name} {op} {self.lower!r} AND
{self.upper!r})"
+
+
+class AndTagFilter(TagFilter):
+ """Logical AND of two tag filters."""
+
+ def __init__(self, left: TagFilter, right: TagFilter):
+ self.left = left
+ self.right = right
+
+ def __repr__(self):
+ return f"({self.left} AND {self.right})"
+
+
+class OrTagFilter(TagFilter):
+ """Logical OR of two tag filters."""
+
+ def __init__(self, left: TagFilter, right: TagFilter):
+ self.left = left
+ self.right = right
+
+ def __repr__(self):
+ return f"({self.left} OR {self.right})"
+
+
+class NotTagFilter(TagFilter):
+ """Logical NOT of a tag filter."""
+
+ def __init__(self, filter: TagFilter):
+ self.filter = filter
+
+ def __repr__(self):
+ return f"NOT({self.filter})"
+
+
+# Convenience factory functions
+def tag_eq(column_name: str, value: str) -> TagFilter:
+ """Create a tag equality filter: column == value."""
+ return ComparisonTagFilter(column_name, value, ComparisonTagFilter.EQ)
+
+
+def tag_neq(column_name: str, value: str) -> TagFilter:
+ """Create a tag inequality filter: column != value."""
+ return ComparisonTagFilter(column_name, value, ComparisonTagFilter.NEQ)
+
+
+def tag_lt(column_name: str, value: str) -> TagFilter:
+ """Create a tag less-than filter: column < value."""
+ return ComparisonTagFilter(column_name, value, ComparisonTagFilter.LT)
+
+
+def tag_lteq(column_name: str, value: str) -> TagFilter:
+ """Create a tag less-than-or-equal filter: column <= value."""
+ return ComparisonTagFilter(column_name, value, ComparisonTagFilter.LTEQ)
+
+
+def tag_gt(column_name: str, value: str) -> TagFilter:
+ """Create a tag greater-than filter: column > value."""
+ return ComparisonTagFilter(column_name, value, ComparisonTagFilter.GT)
+
+
+def tag_gteq(column_name: str, value: str) -> TagFilter:
+ """Create a tag greater-than-or-equal filter: column >= value."""
+ return ComparisonTagFilter(column_name, value, ComparisonTagFilter.GTEQ)
+
+
+def tag_regexp(column_name: str, pattern: str) -> TagFilter:
+ """Create a tag regex match filter."""
+ return ComparisonTagFilter(column_name, pattern,
ComparisonTagFilter.REGEXP)
+
+
+def tag_not_regexp(column_name: str, pattern: str) -> TagFilter:
+ """Create a tag regex not-match filter."""
+ return ComparisonTagFilter(column_name, pattern,
ComparisonTagFilter.NOT_REGEXP)
+
+
+def tag_between(column_name: str, lower: str, upper: str) -> TagFilter:
+ """Create a tag BETWEEN filter: lower <= column <= upper."""
+ return BetweenTagFilter(column_name, lower, upper, is_not=False)
+
+
+def tag_not_between(column_name: str, lower: str, upper: str) -> TagFilter:
+ """Create a tag NOT BETWEEN filter."""
+ return BetweenTagFilter(column_name, lower, upper, is_not=True)
diff --git a/python/tsfile/tsfile_cpp.pxd b/python/tsfile/tsfile_cpp.pxd
index 74f3a7d9..71d60250 100644
--- a/python/tsfile/tsfile_cpp.pxd
+++ b/python/tsfile/tsfile_cpp.pxd
@@ -182,6 +182,8 @@ cdef extern from "cwrapper/tsfile_cwrapper.h":
# Function Declarations
+ ctypedef void * TagFilterHandle
+
# reader:new and close
TsFileReader tsfile_reader_new(const char * pathname, ErrorCode *
err_code);
ErrorCode tsfile_reader_close(TsFileReader reader)
@@ -269,12 +271,15 @@ cdef extern from "cwrapper/tsfile_cwrapper.h":
char** column_names,
int column_names_len,
int offset, int limit,
- ErrorCode* err_code);
+ TagFilterHandle tag_filter,
+ int batch_size,
+ ErrorCode* err_code);
ResultSet tsfile_query_table_batch(TsFileReader reader,
const char * table_name,
char** columns, uint32_t column_num,
int64_t start_time, int64_t end_time,
+ TagFilterHandle tag_filter,
int batch_size, ErrorCode* err_code);
ResultSet _tsfile_reader_query_device(TsFileReader reader,
@@ -306,6 +311,54 @@ cdef extern from "cwrapper/tsfile_cwrapper.h":
void tsfile_free_device_timeseries_metadata_map(
DeviceTimeseriesMetadataMap * map);
+ # Tag filter types and functions
+
+
+ ctypedef enum TagFilterOp:
+ TAG_FILTER_EQ = 0,
+ TAG_FILTER_NEQ = 1,
+ TAG_FILTER_LT = 2,
+ TAG_FILTER_LTEQ = 3,
+ TAG_FILTER_GT = 4,
+ TAG_FILTER_GTEQ = 5,
+ TAG_FILTER_REGEXP = 6,
+ TAG_FILTER_NOT_REGEXP = 7,
+
+ TagFilterHandle tsfile_tag_filter_create(TsFileReader reader,
+ const char* table_name,
+ const char* column_name,
+ const char* value,
+ TagFilterOp op,
+ ErrorCode* err_code)
+
+ TagFilterHandle tsfile_tag_filter_between(TsFileReader reader,
+ const char* table_name,
+ const char* column_name,
+ const char* lower,
+ const char* upper,
+ bint is_not,
+ ErrorCode* err_code)
+
+ TagFilterHandle tsfile_tag_filter_and(TagFilterHandle left,
+ TagFilterHandle right)
+
+ TagFilterHandle tsfile_tag_filter_or(TagFilterHandle left,
+ TagFilterHandle right)
+
+ TagFilterHandle tsfile_tag_filter_not(TagFilterHandle filter)
+
+ void tsfile_tag_filter_free(TagFilterHandle filter)
+
+ ResultSet tsfile_query_table_with_tag_filter(TsFileReader reader,
+ const char* table_name,
+ char** columns,
+ uint32_t column_num,
+ int64_t start_time,
+ int64_t end_time,
+ TagFilterHandle tag_filter,
+ int batch_size,
+ ErrorCode* err_code)
+
# resultSet : get data from resultSet
bint tsfile_result_set_next(ResultSet result_set, ErrorCode * err_code);
bint tsfile_result_set_is_null_by_index(ResultSet result_set, uint32_t
column_index);
diff --git a/python/tsfile/tsfile_py_cpp.pxd b/python/tsfile/tsfile_py_cpp.pxd
index b6baee80..55da081c 100644
--- a/python/tsfile/tsfile_py_cpp.pxd
+++ b/python/tsfile/tsfile_py_cpp.pxd
@@ -53,7 +53,8 @@ cdef public api ResultSet
tsfile_reader_query_table_c(TsFileReader reader, objec
cdef public api ResultSet tsfile_reader_query_table_on_tree_c(TsFileReader
reader, object column_list,
int64_t start_time, int64_t
end_time)
cdef public api ResultSet tsfile_reader_query_table_batch_c(TsFileReader
reader, object table_name, object column_list,
- int64_t start_time, int64_t
end_time, int batch_size)
+ int64_t start_time, int64_t
end_time,
+ TagFilterHandle tag_filter,
int batch_size)
cdef public api ResultSet tsfile_reader_query_paths_c(TsFileReader reader,
object device_name, object sensor_list, int64_t start_time,
int64_t end_time)
@@ -63,7 +64,13 @@ cdef public api ResultSet
tsfile_reader_query_tree_by_row_c(TsFileReader reader,
cdef public api ResultSet tsfile_reader_query_table_by_row_c(TsFileReader
reader, object table_name,
object
column_list, int offset,
- int limit)
+ int limit,
TagFilterHandle tag_filter,
+ int batch_size)
+cdef public api ResultSet
tsfile_reader_query_table_with_tag_filter_c(TsFileReader reader, object
table_name,
+ object
column_list, int64_t start_time,
+ int64_t
end_time, TagFilterHandle tag_filter,
+ int
batch_size)
+
cdef public api object get_table_schema(TsFileReader reader, object table_name)
cdef public api object get_all_table_schema(TsFileReader reader)
cdef public api object get_all_timeseries_schema(TsFileReader reader)
diff --git a/python/tsfile/tsfile_py_cpp.pyx b/python/tsfile/tsfile_py_cpp.pyx
index c564304f..67ea9b34 100644
--- a/python/tsfile/tsfile_py_cpp.pyx
+++ b/python/tsfile/tsfile_py_cpp.pyx
@@ -810,7 +810,9 @@ cdef ResultSet
tsfile_reader_query_tree_by_row_c(TsFileReader reader,
cdef ResultSet tsfile_reader_query_table_by_row_c(TsFileReader reader,
object table_name,
object column_list,
- int offset, int limit):
+ int offset, int limit,
+ TagFilterHandle tag_filter,
+ int batch_size):
cdef ResultSet result
cdef int column_num = len(column_list)
cdef char** columns = <char**> malloc(sizeof(char *) * column_num)
@@ -829,7 +831,7 @@ cdef ResultSet
tsfile_reader_query_table_by_row_c(TsFileReader reader,
result = tsfile_reader_query_table_by_row(reader,
table_name_c, columns,
column_num,
- offset, limit, &code)
+ offset, limit, tag_filter,
batch_size, &code)
check_error(code)
return result
finally:
@@ -842,7 +844,8 @@ cdef ResultSet
tsfile_reader_query_table_by_row_c(TsFileReader reader,
columns = NULL
cdef ResultSet tsfile_reader_query_table_batch_c(TsFileReader reader, object
table_name, object column_list,
- int64_t start_time, int64_t
end_time, int batch_size):
+ int64_t start_time, int64_t
end_time, TagFilterHandle tag_filter,
+ int batch_size):
cdef ResultSet result
cdef int column_num = len(column_list)
cdef bytes table_name_bytes = PyUnicode_AsUTF8String(table_name)
@@ -859,7 +862,7 @@ cdef ResultSet
tsfile_reader_query_table_batch_c(TsFileReader reader, object tab
raise MemoryError("Failed to allocate memory for column name")
result = tsfile_query_table_batch(reader, table_name_c, columns,
column_num, start_time, end_time,
- batch_size, &code)
+ tag_filter, batch_size, &code)
check_error(code)
return result
finally:
@@ -901,6 +904,36 @@ cdef ResultSet tsfile_reader_query_paths_c(TsFileReader
reader, object device_na
free(<void *> sensor_list_c)
sensor_list_c = NULL
+cdef ResultSet tsfile_reader_query_table_with_tag_filter_c(TsFileReader
reader, object table_name,
+ object
column_list, int64_t start_time,
+ int64_t
end_time, TagFilterHandle tag_filter,
+ int batch_size):
+ cdef ResultSet result
+ cdef int column_num = len(column_list)
+ cdef bytes table_name_bytes = PyUnicode_AsUTF8String(table_name)
+ cdef const char * table_name_c = table_name_bytes
+ cdef char** columns = <char**> malloc(sizeof(char *) * column_num)
+ cdef int i
+ cdef ErrorCode code = 0
+ if columns == NULL:
+ raise MemoryError("Failed to allocate memory for columns")
+ try:
+ for i in range(column_num):
+ columns[i] = strdup((<str> column_list[i]).encode('utf-8'))
+ if columns[i] == NULL:
+ raise MemoryError("Failed to allocate memory for column name")
+ result = tsfile_query_table_with_tag_filter(reader, table_name_c,
columns, column_num,
+ start_time, end_time,
tag_filter, batch_size, &code)
+ check_error(code)
+ return result
+ finally:
+ if columns != NULL:
+ for i in range(column_num):
+ free(<void *> columns[i])
+ columns[i] = NULL
+ free(<void *> columns)
+ columns = NULL
+
cdef object get_table_schema(TsFileReader reader, object table_name):
cdef bytes table_name_bytes = PyUnicode_AsUTF8String(table_name)
cdef const char * table_name_c = table_name_bytes
diff --git a/python/tsfile/tsfile_reader.pyx b/python/tsfile/tsfile_reader.pyx
index 2259f770..dbc598f3 100644
--- a/python/tsfile/tsfile_reader.pyx
+++ b/python/tsfile/tsfile_reader.pyx
@@ -22,7 +22,6 @@ import weakref
from typing import List, Optional, Dict
import pandas as pd
-from libc.stdint cimport INT64_MIN, INT64_MAX
from libc.string cimport strlen
from cpython.bytes cimport PyBytes_FromStringAndSize
from libc.string cimport memset
@@ -31,6 +30,7 @@ from libc.stdint cimport INT64_MIN, INT64_MAX, uintptr_t
from tsfile.schema import TSDataType as TSDataTypePy
from tsfile.schema import DeviceID, DeviceTimeseriesMetadataGroup
+from tsfile.tag_filter import ComparisonTagFilter, BetweenTagFilter,
AndTagFilter, OrTagFilter, NotTagFilter
from .date_utils import parse_int_to_date
from .tsfile_cpp cimport *
from .tsfile_py_cpp cimport *
@@ -48,6 +48,9 @@ cdef class ResultSetPy:
cdef ResultSet result
cdef object metadata
+ # Tag filter handle owned by this result set (freed on close).
+ cdef TagFilterHandle _tag_filter_handle
+
# ResultSet is valid or not, if the reader is closed, valid will be False.
cdef object valid
# The reader
@@ -59,6 +62,7 @@ cdef class ResultSetPy:
self.valid = True
self.tsfile_reader = weakref.ref(tsfile_reader)
self.is_tree = is_tree
+ self._tag_filter_handle = NULL
cdef init_c(self, ResultSet result, object device_name):
"""
@@ -273,6 +277,10 @@ cdef class ResultSetPy:
if self.result != NULL:
free_tsfile_result_set(&self.result)
+ if self._tag_filter_handle != NULL:
+ tsfile_tag_filter_free(self._tag_filter_handle)
+ self._tag_filter_handle = NULL
+
if self.tsfile_reader is not None:
reader = self.tsfile_reader()
if reader is not None:
@@ -318,32 +326,78 @@ cdef class TsFileReaderPy:
self.reader = tsfile_reader_new_c(pathname)
def query_table(self, table_name : str, column_names : List[str],
- start_time : int = INT64_MIN, end_time : int = INT64_MAX)
-> ResultSetPy:
+ start_time : int = INT64_MIN, end_time : int = INT64_MAX,
+ tag_filter = None, batch_size : int = 0) -> ResultSetPy:
"""
Execute a time range query on specified table and columns.
+ :param tag_filter: Optional TagFilter to filter by TAG column values.
+ :param batch_size: <= 0 for row-by-row mode; > 0 for batch (TsBlock)
mode.
:return: query result handler.
"""
- cdef ResultSet result;
- result = tsfile_reader_query_table_c(self.reader, table_name.lower(),
- [column_name.lower() for
column_name in column_names], start_time,
- end_time)
- pyresult = ResultSetPy(self)
- pyresult.init_c(result, table_name)
- self.activate_result_set_list.add(pyresult)
- return pyresult
-
- def query_table_batch(self, table_name : str, column_names : List[str],
- start_time : int = INT64_MIN, end_time : int =
INT64_MAX,
- batch_size : int = 1024) -> ResultSetPy:
- cdef ResultSet result;
- result = tsfile_reader_query_table_batch_c(self.reader,
table_name.lower(),
- [column_name.lower() for
column_name in column_names],
- start_time, end_time,
batch_size)
+ cdef ResultSet result
+ cdef TagFilterHandle c_tag_filter = NULL
+ if tag_filter is not None:
+ c_tag_filter = self._build_c_tag_filter(table_name.lower(),
tag_filter)
+ if batch_size <= 0:
+ result = tsfile_reader_query_table_with_tag_filter_c(
+ self.reader, table_name.lower(),
+ [column_name.lower() for column_name in column_names],
+ start_time, end_time, c_tag_filter, batch_size)
+ else:
+ result = tsfile_reader_query_table_batch_c(
+ self.reader, table_name.lower(),
+ [column_name.lower() for column_name in column_names],
+ start_time, end_time, c_tag_filter, batch_size)
pyresult = ResultSetPy(self)
+ pyresult._tag_filter_handle = c_tag_filter
pyresult.init_c(result, table_name)
self.activate_result_set_list.add(pyresult)
return pyresult
+ cdef TagFilterHandle _build_c_tag_filter(self, str table_name, object
tag_filter):
+ """Recursively build C TagFilterHandle from Python TagFilter tree."""
+ cdef ErrorCode code = 0
+ cdef TagFilterHandle handle = NULL
+ cdef bytes table_bytes
+ cdef bytes col_bytes
+ cdef bytes val_bytes
+ cdef bytes lower_bytes
+ cdef bytes upper_bytes
+
+ if isinstance(tag_filter, ComparisonTagFilter):
+ table_bytes = table_name.encode('utf-8')
+ col_bytes = tag_filter.column_name.encode('utf-8')
+ val_bytes = tag_filter.value.encode('utf-8')
+ handle = tsfile_tag_filter_create(
+ self.reader, <const char*>table_bytes,
+ <const char*>col_bytes, <const char*>val_bytes,
+ <TagFilterOp>tag_filter.op, &code)
+ check_error(code)
+ return handle
+ elif isinstance(tag_filter, BetweenTagFilter):
+ table_bytes = table_name.encode('utf-8')
+ col_bytes = tag_filter.column_name.encode('utf-8')
+ lower_bytes = tag_filter.lower.encode('utf-8')
+ upper_bytes = tag_filter.upper.encode('utf-8')
+ handle = tsfile_tag_filter_between(
+ self.reader, <const char*>table_bytes,
+ <const char*>col_bytes, <const char*>lower_bytes,
+ <const char*>upper_bytes, tag_filter.is_not, &code)
+ check_error(code)
+ return handle
+ elif isinstance(tag_filter, AndTagFilter):
+ left = self._build_c_tag_filter(table_name, tag_filter.left)
+ right = self._build_c_tag_filter(table_name, tag_filter.right)
+ return tsfile_tag_filter_and(left, right)
+ elif isinstance(tag_filter, OrTagFilter):
+ left = self._build_c_tag_filter(table_name, tag_filter.left)
+ right = self._build_c_tag_filter(table_name, tag_filter.right)
+ return tsfile_tag_filter_or(left, right)
+ elif isinstance(tag_filter, NotTagFilter):
+ inner = self._build_c_tag_filter(table_name, tag_filter.filter)
+ return tsfile_tag_filter_not(inner)
+ else:
+ raise TypeError(f"Unknown tag filter type: {type(tag_filter)}")
def query_table_on_tree(self, column_names : List[str],
start_time : int = INT64_MIN, end_time : int =
INT64_MAX) -> ResultSetPy:
"""
@@ -377,14 +431,19 @@ cdef class TsFileReaderPy:
return pyresult
def query_table_by_row(self, table_name : str, column_names : List[str],
- offset : int = 0, limit : int = -1) ->
ResultSetPy:
+ offset : int = 0, limit : int = -1,
+ tag_filter = None, batch_size : int = 0
+ ) -> ResultSetPy:
"""
Execute table-model query by row with offset/limit.
"""
cdef ResultSet result
+ cdef TagFilterHandle c_tag_filter = NULL
+ if tag_filter is not None:
+ c_tag_filter = self._build_c_tag_filter(table_name.lower(),
tag_filter)
result = tsfile_reader_query_table_by_row_c(self.reader,
table_name.lower(),
[column_name.lower() for
column_name in column_names],
- offset, limit)
+ offset, limit,
c_tag_filter, batch_size)
pyresult = ResultSetPy(self)
pyresult.init_c(result, table_name)
self.activate_result_set_list.add(pyresult)