This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/tsfile.git
The following commit(s) were added to refs/heads/develop by this push:
new b8885ae1 Optimize and Fix some bugs in TsFileDataFrame (#773)
b8885ae1 is described below
commit b8885ae149606b37995e7c5bf0e4385ac7e0fb6d
Author: YangCaiyin <[email protected]>
AuthorDate: Sun Apr 12 10:40:17 2026 +0800
Optimize and Fix some bugs in TsFileDataFrame (#773)
* optmize preview performance
* fix loc with more data we need
* fix loading problem
* fix query by row problem (temporal)
* support sparse tags
* Fix local build_ext by using opaque tag filter pointers in Cython API
* fix by review
---
python/tests/test_tsfile_dataset.py | 465 +++++++++++++++++++++++++++++++++++-
python/tsfile/dataset/dataframe.py | 154 +++++++++---
python/tsfile/dataset/formatting.py | 59 ++---
python/tsfile/dataset/metadata.py | 90 +++++--
python/tsfile/dataset/reader.py | 93 ++++++--
python/tsfile/tsfile_py_cpp.pxd | 8 +-
python/tsfile/tsfile_py_cpp.pyx | 6 +-
7 files changed, 755 insertions(+), 120 deletions(-)
diff --git a/python/tests/test_tsfile_dataset.py
b/python/tests/test_tsfile_dataset.py
index 379bb547..ef6522b6 100644
--- a/python/tests/test_tsfile_dataset.py
+++ b/python/tests/test_tsfile_dataset.py
@@ -25,7 +25,7 @@ from tsfile import ColumnCategory, ColumnSchema, TSDataType,
TableSchema, TsFile
from tsfile import AlignedTimeseries, Timeseries, TsFileDataFrame
from tsfile.dataset.formatting import format_timestamp
from tsfile.dataset.metadata import MetadataCatalog, build_series_path,
resolve_series_path
-from tsfile.dataset.reader import TsFileSeriesReader
+from tsfile.dataset.reader import TsFileSeriesReader, _build_exact_tag_filter
def _write_weather_file(path, start):
@@ -63,6 +63,19 @@ def _write_weather_rows_file(path, rows):
writer.write_dataframe(df)
+def _write_empty_weather_file(path):
+ schema = TableSchema(
+ "weather",
+ [
+ ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG),
+ ColumnSchema("temperature", TSDataType.DOUBLE,
ColumnCategory.FIELD),
+ ColumnSchema("humidity", TSDataType.DOUBLE, ColumnCategory.FIELD),
+ ],
+ )
+ with TsFileTableWriter(str(path), schema):
+ pass
+
+
def _write_numeric_and_text_file(path):
schema = TableSchema(
"weather",
@@ -229,6 +242,162 @@ def test_dataset_basic_access_patterns(tmp_path, capsys):
assert "AlignedTimeseries(6 rows, 2 series)" in capsys.readouterr().out
+def
test_dataset_loc_aligns_timestamp_union_and_preserves_requested_order(tmp_path):
+ path = tmp_path / "weather_sparse.tsfile"
+ _write_weather_rows_file(
+ path,
+ {
+ "time": [0, 1, 2],
+ "device": ["device_a", "device_a", "device_a"],
+ "temperature": [10.0, np.nan, 30.0],
+ "humidity": [np.nan, 200.0, 300.0],
+ },
+ )
+
+ with TsFileDataFrame(str(path), show_progress=False) as tsdf:
+ aligned = tsdf.loc[
+ 0:2,
+ [
+ "weather.device_a.humidity",
+ "weather.device_a.temperature",
+ ],
+ ]
+
+ assert isinstance(aligned, AlignedTimeseries)
+ assert aligned.series_names == [
+ "weather.device_a.humidity",
+ "weather.device_a.temperature",
+ ]
+ np.testing.assert_array_equal(aligned.timestamps, np.array([0, 1, 2],
dtype=np.int64))
+ assert aligned.shape == (3, 2)
+ assert np.isnan(aligned.values[0, 0])
+ assert aligned.values[0, 1] == 10.0
+ assert aligned.values[1, 0] == 200.0
+ assert np.isnan(aligned.values[1, 1])
+ assert aligned.values[2, 0] == 300.0
+ assert aligned.values[2, 1] == 30.0
+
+
+def
test_dataset_loc_supports_single_timestamp_and_mixed_series_specifiers(tmp_path):
+ path = tmp_path / "weather.tsfile"
+ _write_weather_file(path, 0)
+
+ with TsFileDataFrame(str(path), show_progress=False) as tsdf:
+ aligned = tsdf.loc[1, [0, "weather.device_a.humidity"]]
+
+ assert isinstance(aligned, AlignedTimeseries)
+ assert aligned.series_names == [
+ "weather.device_a.temperature",
+ "weather.device_a.humidity",
+ ]
+ np.testing.assert_array_equal(aligned.timestamps, np.array([1],
dtype=np.int64))
+ np.testing.assert_array_equal(aligned.values, np.array([[21.5, 52.0]]))
+
+
+def
test_dataset_loc_supports_open_ended_ranges_and_negative_series_index(tmp_path):
+ path = tmp_path / "weather.tsfile"
+ _write_weather_file(path, 100)
+
+ with TsFileDataFrame(str(path), show_progress=False) as tsdf:
+ aligned = tsdf.loc[:101, [-1]]
+
+ assert isinstance(aligned, AlignedTimeseries)
+ assert aligned.series_names == ["weather.device_a.humidity"]
+ np.testing.assert_array_equal(aligned.timestamps, np.array([100, 101],
dtype=np.int64))
+ np.testing.assert_array_equal(aligned.values, np.array([[50.0],
[52.0]]))
+
+
+def
test_dataset_loc_with_nulls_does_not_expand_beyond_requested_time_range(tmp_path):
+ path = tmp_path / "weather_sparse_range.tsfile"
+ _write_weather_rows_file(
+ path,
+ {
+ "time": [0, 1, 2, 3],
+ "device": ["device_a", "device_a", "device_a", "device_a"],
+ "temperature": [10.0, np.nan, np.nan, 40.0],
+ "humidity": [np.nan, 20.0, np.nan, 50.0],
+ },
+ )
+
+ with TsFileDataFrame(str(path), show_progress=False) as tsdf:
+ aligned = tsdf.loc[
+ 1:2,
+ [
+ "weather.device_a.temperature",
+ "weather.device_a.humidity",
+ ],
+ ]
+
+ assert isinstance(aligned, AlignedTimeseries)
+ np.testing.assert_array_equal(aligned.timestamps, np.array([1, 2],
dtype=np.int64))
+ assert aligned.shape == (2, 2)
+ assert np.isnan(aligned.values[0, 0])
+ assert aligned.values[0, 1] == 20.0
+ assert np.isnan(aligned.values[1, 0])
+ assert np.isnan(aligned.values[1, 1])
+
+
+def
test_dataset_loc_single_timestamp_with_nulls_keeps_exact_time_window(tmp_path):
+ path = tmp_path / "weather_sparse_point.tsfile"
+ _write_weather_rows_file(
+ path,
+ {
+ "time": [0, 1, 2],
+ "device": ["device_a", "device_a", "device_a"],
+ "temperature": [10.0, np.nan, 30.0],
+ "humidity": [np.nan, 20.0, 40.0],
+ },
+ )
+
+ with TsFileDataFrame(str(path), show_progress=False) as tsdf:
+ aligned = tsdf.loc[
+ 1,
+ [
+ "weather.device_a.temperature",
+ "weather.device_a.humidity",
+ ],
+ ]
+
+ assert isinstance(aligned, AlignedTimeseries)
+ np.testing.assert_array_equal(aligned.timestamps, np.array([1],
dtype=np.int64))
+ assert aligned.shape == (1, 2)
+ assert np.isnan(aligned.values[0, 0])
+ assert aligned.values[0, 1] == 20.0
+
+
+def test_dataset_repr_only_builds_preview_rows(tmp_path, monkeypatch):
+ path = tmp_path / "weather.tsfile"
+ _write_weather_file(path, 0)
+
+ with TsFileDataFrame(str(path), show_progress=False) as tsdf:
+ tsdf._index.series_refs_ordered = [(0, 0)] * 1000
+
+ built_rows = []
+
+ def fake_build_series_info(series_ref):
+ built_rows.append(series_ref)
+ return {
+ "table_name": "weather",
+ "field": "temperature",
+ "tag_columns": ("device",),
+ "tag_values": {"device": "device_a"},
+ "min_time": 0,
+ "max_time": 2,
+ "count": 3,
+ }
+
+ def fail_build_series_name(_series_ref):
+ raise AssertionError("__repr__ should not build full series names
for preview output")
+
+ monkeypatch.setattr(tsdf, "_build_series_info", fake_build_series_info)
+ monkeypatch.setattr(tsdf, "_build_series_name", fail_build_series_name)
+
+ rendered = repr(tsdf)
+ assert "TsFileDataFrame(1000 time series, 1 files)" in rendered
+ assert "..." in rendered
+ assert len(built_rows) == 20
+
+
def test_dataset_exposes_only_numeric_fields_and_keeps_nan(tmp_path):
path = tmp_path / "numeric_and_text.tsfile"
_write_numeric_and_text_file(path)
@@ -359,6 +528,31 @@ def
test_dataset_rejects_incompatible_table_schemas_across_shards(tmp_path):
TsFileDataFrame([str(path1), str(path2)], show_progress=False)
+def test_dataset_skips_empty_tsfile_shards(tmp_path):
+ empty_path = tmp_path / "empty.tsfile"
+ data_path = tmp_path / "part.tsfile"
+ _write_empty_weather_file(empty_path)
+ _write_weather_file(data_path, 0)
+
+ with TsFileDataFrame([str(empty_path), str(data_path)],
show_progress=False) as tsdf:
+ assert tsdf.list_timeseries() == [
+ "weather.device_a.temperature",
+ "weather.device_a.humidity",
+ ]
+
+
+def test_reader_allows_empty_tsfile(tmp_path):
+ path = tmp_path / "empty.tsfile"
+ _write_empty_weather_file(path)
+
+ reader = TsFileSeriesReader(str(path), show_progress=False)
+ try:
+ assert reader.series_paths == []
+ assert reader.catalog.series_count == 0
+ finally:
+ reader.close()
+
+
def test_dataset_multi_tag_metadata_discovery(tmp_path):
path = tmp_path / "multi_tag.tsfile"
_write_multi_tag_file(path)
@@ -453,6 +647,61 @@ def
test_reader_catalog_shares_device_metadata_and_resolves_paths(tmp_path):
reader.close()
+def
test_reader_read_series_by_row_retries_across_native_row_query_boundaries():
+ class _FakeResultSet:
+ def __init__(self, rows):
+ self._rows = rows
+ self._index = -1
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ return False
+
+ def next(self):
+ self._index += 1
+ return self._index < len(self._rows)
+
+ def get_value_by_name(self, name):
+ return self._rows[self._index][name]
+
+ class _FakeNativeReader:
+ def __init__(self, timestamps, values, boundary):
+ self._timestamps = timestamps
+ self._values = values
+ self._boundary = boundary
+
+ def query_table_by_row(self, table_name, column_names, offset=0,
limit=-1, tag_filter=None):
+ assert table_name == "pvf"
+ assert column_names == ["totalcloudcover"]
+ assert tag_filter is None
+ if limit < 0:
+ stop = len(self._timestamps)
+ else:
+ stop = min(offset + limit, len(self._timestamps))
+
+ # Simulate the current native bug: one row query cannot cross the
+ # next internal boundary, so callers must re-issue from the
+ # advanced offset to complete a large logical window.
+ chunk_stop = min(stop, ((offset // self._boundary) + 1) *
self._boundary)
+ rows = [
+ {"time": int(self._timestamps[idx]), "totalcloudcover":
float(self._values[idx])}
+ for idx in range(offset, chunk_stop)
+ ]
+ return _FakeResultSet(rows)
+
+ reader = object.__new__(TsFileSeriesReader)
+ reader._reader = _FakeNativeReader(np.arange(30, dtype=np.int64),
np.arange(30, dtype=np.float64), boundary=10)
+ reader._catalog = MetadataCatalog()
+ table_id = reader._catalog.add_table("pvf", (), (), ("totalcloudcover",))
+ device_id = reader._catalog.add_device(table_id, (), 0, 29)
+
+ ts_arr, values = reader.read_series_by_row(device_id, 0, 5, 12)
+ np.testing.assert_array_equal(ts_arr, np.arange(5, 17, dtype=np.int64))
+ np.testing.assert_array_equal(values, np.arange(5, 17, dtype=np.float64))
+
+
def test_series_path_resolution_allows_prefix_tag_values():
catalog = MetadataCatalog()
table_id = catalog.add_table(
@@ -474,3 +723,217 @@ def
test_series_path_resolution_allows_prefix_tag_values():
series_path = build_series_path(catalog, device_id, 0)
assert series_path == "weather.site_a.device_a.temperature"
assert resolve_series_path(catalog, series_path) == (table_id, device_id,
0)
+
+
+def test_series_path_resolution_allows_missing_trailing_tag_value():
+ catalog = MetadataCatalog()
+ table_id = catalog.add_table(
+ "weather",
+ ("device",),
+ (TSDataType.STRING,),
+ ("temperature",),
+ )
+ device_id = catalog.add_device(table_id, (), 0, 1)
+ catalog.series_stats_by_ref[(device_id, 0)] = {
+ "length": 1,
+ "min_time": 0,
+ "max_time": 0,
+ "timeline_length": 1,
+ "timeline_min_time": 0,
+ "timeline_max_time": 0,
+ }
+
+ series_path = build_series_path(catalog, device_id, 0)
+ assert series_path == "weather.temperature"
+ assert resolve_series_path(catalog, series_path) == (table_id, device_id,
0)
+
+
+def test_series_path_resolution_uses_named_tags_for_sparse_non_prefix_values():
+ catalog = MetadataCatalog()
+ table_id = catalog.add_table(
+ "weather",
+ ("city", "device", "region"),
+ (TSDataType.STRING, TSDataType.STRING, TSDataType.STRING),
+ ("temperature",),
+ )
+ device_id = catalog.add_device(table_id, (None, "device_a", None), 0, 1)
+ catalog.series_stats_by_ref[(device_id, 0)] = {
+ "length": 1,
+ "min_time": 0,
+ "max_time": 0,
+ "timeline_length": 1,
+ "timeline_min_time": 0,
+ "timeline_max_time": 0,
+ }
+
+ series_path = build_series_path(catalog, device_id, 0)
+ assert series_path == "weather.device_a.temperature"
+ assert resolve_series_path(catalog, series_path) == (table_id, device_id,
0)
+
+
+def test_reader_metadata_tag_values_trim_trailing_none():
+ class _Group:
+ segments = ("weather", "device_a", None, None)
+
+ assert TsFileSeriesReader._metadata_tag_values(_Group(), 3) ==
("device_a",)
+ assert TsFileSeriesReader._metadata_tag_values(_Group(), 1) ==
("device_a",)
+
+
+def test_exact_tag_filter_rejects_none_tag_values():
+ with pytest.raises(NotImplementedError, match="IS NULL / IS NOT NULL"):
+ _build_exact_tag_filter({"device": None})
+ with pytest.raises(NotImplementedError, match="IS NULL / IS NOT NULL"):
+ _build_exact_tag_filter({"city": "beijing", "device": None})
+
+
+def test_reader_exact_match_with_none_tag_values_fails_fast():
+ class _FakeNativeReader:
+ def query_table(self, *args, **kwargs):
+ raise AssertionError("query should not be issued when None-tag
exact matching is unsupported")
+
+ def query_table_by_row(self, *args, **kwargs):
+ raise AssertionError("row query should not be issued when None-tag
exact matching is unsupported")
+
+ reader = object.__new__(TsFileSeriesReader)
+ reader._reader = _FakeNativeReader()
+ reader._catalog = MetadataCatalog()
+ table_id = reader._catalog.add_table(
+ "weather",
+ ("city", "device", "region"),
+ (TSDataType.STRING, TSDataType.STRING, TSDataType.STRING),
+ ("temperature",),
+ )
+ device_id = reader._catalog.add_device(table_id, (None, "device_a",
"north"), 0, 1)
+ reader._catalog.series_stats_by_ref[(device_id, 0)] = {
+ "length": 2,
+ "min_time": 0,
+ "max_time": 1,
+ "timeline_length": 2,
+ "timeline_min_time": 0,
+ "timeline_max_time": 1,
+ }
+
+ with pytest.raises(NotImplementedError, match="IS NULL / IS NOT NULL"):
+ reader.read_series_by_ref(device_id, 0, 0, 1)
+ with pytest.raises(NotImplementedError, match="IS NULL / IS NOT NULL"):
+ reader.read_series_by_row(device_id, 0, 0, 2)
+
+
+def test_dataframe_resolves_named_sparse_tag_series_path():
+ tsdf = object.__new__(TsFileDataFrame)
+ tsdf._index = dataframe_module._LogicalIndex()
+ tsdf._index.table_entries["weather"] = dataframe_module.TableEntry(
+ table_name="weather",
+ tag_columns=("city", "device", "region"),
+ tag_types=(TSDataType.STRING, TSDataType.STRING, TSDataType.STRING),
+ field_columns=("temperature",),
+ )
+ device_key = ("weather", (None, "device_a"))
+ tsdf._index.device_order = [device_key]
+ tsdf._index.device_index_by_key = {device_key: 0}
+ tsdf._index.tables_with_sparse_tag_values = {"weather"}
+ tsdf._index.sparse_device_indices_by_compressed_path = {("weather",
("device_a",)): [0]}
+ tsdf._index.device_refs = [[]]
+ tsdf._index.series_refs_ordered = [(0, 0)]
+ tsdf._index.series_ref_set = {(0, 0)}
+ tsdf._index.series_ref_map = {(0, 0): []}
+
+ assert tsdf.list_timeseries() == ["weather.device_a.temperature"]
+ assert tsdf._resolve_series_name("weather.device_a.temperature") == (0, 0)
+
+
+def test_dataframe_list_timeseries_filters_named_sparse_tag_prefix():
+ tsdf = object.__new__(TsFileDataFrame)
+ tsdf._index = dataframe_module._LogicalIndex()
+ tsdf._index.table_entries["weather"] = dataframe_module.TableEntry(
+ table_name="weather",
+ tag_columns=("city", "device", "region"),
+ tag_types=(TSDataType.STRING, TSDataType.STRING, TSDataType.STRING),
+ field_columns=("temperature",),
+ )
+ tsdf._index.device_order = [
+ ("weather", (None, "device_a")),
+ ("weather", ("beijing", "device_b")),
+ ]
+ tsdf._index.device_index_by_key = {
+ ("weather", (None, "device_a")): 0,
+ ("weather", ("beijing", "device_b")): 1,
+ }
+ tsdf._index.tables_with_sparse_tag_values = {"weather"}
+ tsdf._index.sparse_device_indices_by_compressed_path = {
+ ("weather", ("device_a",)): [0],
+ ("weather", ("beijing", "device_b")): [1],
+ }
+ tsdf._index.device_refs = [[], []]
+ tsdf._index.series_refs_ordered = [(0, 0), (1, 0)]
+ tsdf._index.series_ref_set = {(0, 0), (1, 0)}
+ tsdf._index.series_ref_map = {(0, 0): [], (1, 0): []}
+
+ assert tsdf.list_timeseries("weather.device_a") ==
["weather.device_a.temperature"]
+
+
+def test_dataframe_list_timeseries_prefix_can_skip_full_name_build(tmp_path,
monkeypatch):
+ path = tmp_path / "weather.tsfile"
+ _write_weather_file(path, 0)
+
+ with TsFileDataFrame(str(path), show_progress=False) as tsdf:
+ tsdf._index.series_refs_ordered = [(0, 0)] * 1000
+
+ def fail_build_series_name(_series_ref):
+ raise AssertionError("list_timeseries(prefix) should not build
full names for non-matching series")
+
+ monkeypatch.setattr(tsdf, "_build_series_name", fail_build_series_name)
+ assert tsdf.list_timeseries("pvf") == []
+
+
+def test_series_path_resolution_reports_ambiguous_sparse_path():
+ catalog = MetadataCatalog()
+ table_id = catalog.add_table(
+ "weather",
+ ("city", "device"),
+ (TSDataType.STRING, TSDataType.STRING),
+ ("temperature",),
+ )
+ first_id = catalog.add_device(table_id, ("beijing", None), 0, 1)
+ second_id = catalog.add_device(table_id, (None, "beijing"), 0, 1)
+ for device_id in (first_id, second_id):
+ catalog.series_stats_by_ref[(device_id, 0)] = {
+ "length": 1,
+ "min_time": 0,
+ "max_time": 0,
+ "timeline_length": 1,
+ "timeline_min_time": 0,
+ "timeline_max_time": 0,
+ }
+
+ assert build_series_path(catalog, first_id, 0) ==
"weather.beijing.temperature"
+ assert build_series_path(catalog, second_id, 0) ==
"weather.beijing.temperature"
+ with pytest.raises(ValueError, match="Ambiguous series path"):
+ resolve_series_path(catalog, "weather.beijing.temperature")
+
+
+def test_reader_show_progress_reports_start_immediately(tmp_path, capsys):
+ path = tmp_path / "weather.tsfile"
+ _write_weather_file(path, 0)
+
+ reader = TsFileSeriesReader(str(path), show_progress=True)
+ try:
+ stderr = capsys.readouterr().err
+ assert "Reading TsFile metadata: 0/1" in stderr
+ assert "Reading TsFile metadata: 1 table(s), 2 series ... done" in
stderr
+ finally:
+ reader.close()
+
+
+def test_dataframe_parallel_show_progress_reports_start_immediately(tmp_path,
capsys):
+ path1 = tmp_path / "part1.tsfile"
+ path2 = tmp_path / "part2.tsfile"
+ _write_weather_file(path1, 0)
+ _write_weather_file(path2, 3)
+
+ with TsFileDataFrame([str(path1), str(path2)], show_progress=True):
+ pass
+
+ stderr = capsys.readouterr().err
+ assert "Loading TsFile shards: 0/2" in stderr
+ assert "Loading TsFile shards: 2/2 (4 series) ... done" in stderr
diff --git a/python/tsfile/dataset/dataframe.py
b/python/tsfile/dataset/dataframe.py
index baef77c3..4cc373ca 100644
--- a/python/tsfile/dataset/dataframe.py
+++ b/python/tsfile/dataset/dataframe.py
@@ -29,7 +29,12 @@ import warnings
import numpy as np
from .formatting import format_dataframe_table
-from .metadata import TableEntry, _coerce_path_component,
build_logical_series_path, split_logical_series_path
+from .metadata import (
+ TableEntry,
+ build_logical_series_components,
+ build_logical_series_path,
+ split_logical_series_path,
+)
from .merge import build_aligned_matrix, merge_time_value_parts,
merge_timestamp_parts
from .timeseries import AlignedTimeseries, Timeseries
@@ -60,6 +65,11 @@ class _LogicalIndex:
device_order: List[DeviceKey] = field(default_factory=list)
# Map one logical device key to its dataframe-local device index.
device_index_by_key: Dict[DeviceKey, int] = field(default_factory=dict)
+ # Tables that need sparse compressed-path lookup because some devices
+ # contain non-trailing missing tag values.
+ tables_with_sparse_tag_values: Set[str] = field(default_factory=set)
+ # Map one compressed tree-style device path to sparse logical devices only.
+ sparse_device_indices_by_compressed_path: Dict[Tuple[str, Tuple[str,
...]], List[int]] = field(default_factory=dict)
# For each logical device, keep the contributing reader-local device refs.
device_refs: List[List[DeviceRef]] = field(default_factory=list)
@@ -155,6 +165,15 @@ def _register_reader(
index.device_index_by_key[device_key] = device_idx
index.device_order.append(device_key)
index.device_refs.append([])
+ if any(value is None for value in device_entry.tag_values):
+ index.tables_with_sparse_tag_values.add(table_entry.table_name)
+ compressed_components = tuple(
+ build_logical_series_components(
+ table_entry.table_name, device_entry.tag_values, "",
table_entry.tag_columns
+ )[1:-1]
+ )
+ compressed_key = (table_entry.table_name,
compressed_components)
+
index.sparse_device_indices_by_compressed_path.setdefault(compressed_key,
[]).append(device_idx)
index.device_refs[device_idx].append((reader, device_id))
for field_idx in range(len(table_entry.field_columns)):
@@ -557,14 +576,31 @@ class TsFileDataFrame:
if not self._index.series_refs_ordered:
raise ValueError("No valid time series found in the provided
TsFile files")
+ def _show_loading_progress(self, done: int, total: int, total_series: int
= None):
+ if not self._show_progress or total <= 0:
+ return
+
+ if total_series is None:
+ sys.stderr.write(f"\rLoading TsFile shards: {done}/{total}")
+ else:
+ sys.stderr.write(f"\rLoading TsFile shards: {done}/{total}
({total_series} series) ... done\n")
+ sys.stderr.flush()
+
def _load_metadata_serial(self, reader_class):
- for file_path in self._paths:
+ total = len(self._paths)
+ self._show_loading_progress(0, total)
+
+ for index, file_path in enumerate(self._paths, start=1):
_register_reader(
self._readers,
self._index,
file_path,
- reader_class(file_path, show_progress=self._show_progress),
+ reader_class(file_path, show_progress=self._show_progress and
total == 1),
)
+ if total > 1:
+ self._show_loading_progress(index, total)
+
+ self._show_loading_progress(total, total, sum(reader.series_count for
reader in self._readers.values()))
def _load_metadata_parallel(self, reader_class):
from concurrent.futures import ThreadPoolExecutor, as_completed
@@ -573,6 +609,7 @@ class TsFileDataFrame:
return file_path, reader_class(file_path, show_progress=False)
total = len(self._paths)
+ self._show_loading_progress(0, total)
with ThreadPoolExecutor(max_workers=min(total, os.cpu_count() or 4))
as executor:
futures = {executor.submit(open_file, path): path for path in
self._paths}
results = {}
@@ -581,14 +618,9 @@ class TsFileDataFrame:
file_path, reader = future.result()
results[file_path] = reader
done += 1
- if self._show_progress:
- sys.stderr.write(f"\rLoading TsFile shards:
{done}/{total}")
- sys.stderr.flush()
+ self._show_loading_progress(done, total)
- if self._show_progress and total > 0:
- total_series = sum(reader.series_count for reader in
results.values())
- sys.stderr.write(f"\rLoading TsFile shards: {total}/{total}
({total_series} series) ... done\n")
- sys.stderr.flush()
+ self._show_loading_progress(total, total, sum(reader.series_count for
reader in results.values()))
for file_path in self._paths:
_register_reader(
@@ -607,7 +639,7 @@ class TsFileDataFrame:
device_key, table_entry, field_idx =
self._get_series_components(series_ref)
table_name, tag_values = device_key
field_name = table_entry.field_columns[field_idx]
- return build_logical_series_path(table_name, tag_values, field_name)
+ return build_logical_series_path(table_name, tag_values, field_name,
table_entry.tag_columns)
def _resolve_series_name(self, series_name: str) -> SeriesRefKey:
try:
@@ -622,24 +654,33 @@ class TsFileDataFrame:
raise KeyError(_series_lookup_hint(series_name))
table_entry = self._index.table_entries[table_name]
- expected_parts = len(table_entry.tag_columns) + 2
- if len(parts) > expected_parts:
- raise KeyError(_series_lookup_hint(series_name))
-
field_name = parts[-1]
try:
field_idx = table_entry.get_field_index(field_name)
except ValueError as exc:
raise KeyError(_series_lookup_hint(series_name)) from exc
- tag_values = tuple(
- _coerce_path_component(raw_value, tag_type)
- for raw_value, tag_type in zip(parts[1:-1], table_entry.tag_types)
- )
- device_key = (table_name, tag_values)
- device_idx = self._index.device_index_by_key.get(device_key)
- if device_idx is None:
- raise KeyError(_series_lookup_hint(series_name))
+ tag_parts = parts[1:-1]
+ direct_device_idx = self._index.device_index_by_key.get((table_name,
tuple(tag_parts)))
+
+ if table_name not in self._index.tables_with_sparse_tag_values:
+ if direct_device_idx is None:
+ raise KeyError(_series_lookup_hint(series_name))
+ device_idx = direct_device_idx
+ else:
+ compressed_key = (table_name, tuple(tag_parts))
+ sparse_device_indices =
self._index.sparse_device_indices_by_compressed_path.get(compressed_key, [])
+ candidate_indices = []
+ if direct_device_idx is not None:
+ candidate_indices.append(direct_device_idx)
+ for device_idx in sparse_device_indices:
+ if device_idx not in candidate_indices:
+ candidate_indices.append(device_idx)
+ if not candidate_indices:
+ raise KeyError(_series_lookup_hint(series_name))
+ if len(candidate_indices) > 1:
+ raise KeyError(f"Ambiguous series path: '{series_name}'.")
+ device_idx = candidate_indices[0]
series_ref = (device_idx, field_idx)
if series_ref not in self._index.series_ref_set:
@@ -664,11 +705,26 @@ class TsFileDataFrame:
return len(self._index.series_refs_ordered)
def list_timeseries(self, path_prefix: str = "") -> List[str]:
- names = [self._build_series_name(series_ref) for series_ref in
self._index.series_refs_ordered]
if not path_prefix:
- return names
- prefix = path_prefix if path_prefix.endswith(".") else path_prefix +
"."
- return [name for name in names if name.startswith(prefix) or name ==
path_prefix]
+ return [self._build_series_name(series_ref) for series_ref in
self._index.series_refs_ordered]
+
+ try:
+ prefix_parts = split_logical_series_path(path_prefix)
+ except ValueError:
+ return []
+
+ matched = []
+ for series_ref in self._index.series_refs_ordered:
+ device_key, table_entry, field_idx =
self._get_series_components(series_ref)
+ components = build_logical_series_components(
+ table_entry.table_name,
+ device_key[1],
+ table_entry.field_columns[field_idx],
+ table_entry.tag_columns,
+ )
+ if prefix_parts == components[: len(prefix_parts)]:
+ matched.append(self._build_series_name(series_ref))
+ return matched
def _get_timeseries(self, series_ref: SeriesRefKey) -> Timeseries:
self._assert_open()
@@ -764,20 +820,44 @@ class TsFileDataFrame:
seen.setdefault(column, True)
return list(seen.keys())
+ @staticmethod
+ def _preview_indices(indices: List[int], max_rows: int) ->
Tuple[List[int], bool, int]:
+ total = len(indices)
+ if total <= max_rows:
+ return indices, False, total
+
+ head = max_rows // 2
+ tail = max_rows - head
+ return list(indices[:head]) + list(indices[-tail:]), True, head
+
def _format_table(self, indices=None, max_rows: int = 20) -> str:
- series_names = []
- merged_info = {}
- for series_ref in self._index.series_refs_ordered:
- series_name = self._build_series_name(series_ref)
- series_names.append(series_name)
- merged_info[series_name] = self._build_series_info(series_ref)
+ if indices is None:
+ indices = list(range(len(self._index.series_refs_ordered)))
+ else:
+ indices = list(indices)
+
+ preview_indices, truncated, split_index =
self._preview_indices(indices, max_rows)
+ rows = []
+ for idx in preview_indices:
+ series_ref = self._index.series_refs_ordered[idx]
+ info = self._build_series_info(series_ref)
+ row = {
+ "index": idx,
+ "table": info["table_name"],
+ "field": info["field"],
+ "start_time": info["min_time"],
+ "end_time": info["max_time"],
+ "count": info["count"],
+ }
+ row.update(info["tag_values"])
+ rows.append(row)
return format_dataframe_table(
- series_names,
- merged_info,
+ rows,
self._collect_tag_columns(),
- indices=indices,
- max_rows=max_rows,
+ total_count=len(indices),
+ truncated=truncated,
+ split_index=split_index,
)
def _repr_header(self) -> str:
diff --git a/python/tsfile/dataset/formatting.py
b/python/tsfile/dataset/formatting.py
index 5e01bb39..527d8798 100644
--- a/python/tsfile/dataset/formatting.py
+++ b/python/tsfile/dataset/formatting.py
@@ -19,7 +19,7 @@
"""String formatting helpers for dataset objects."""
from datetime import datetime
-from typing import Dict, List, Optional
+from typing import List, Optional
import numpy as np
@@ -97,50 +97,35 @@ def format_aligned_timeseries(
def format_dataframe_table(
- series_list: List[str],
- merged_info: Dict[str, dict],
+ rows: List[dict],
tag_columns: List[str],
- indices: Optional[List[int]] = None,
- max_rows: int = 20,
+ total_count: int,
+ truncated: bool = False,
+ split_index: Optional[int] = None,
) -> str:
"""Render the metadata table used by TsFileDataFrame.__repr__."""
- if indices is None:
- indices = list(range(len(series_list)))
- else:
- indices = list(indices)
-
- total = len(indices)
- if total > max_rows:
- show_indices = list(indices[: max_rows // 2]) + list(indices[-max_rows
// 2 :])
- truncated = True
- else:
- show_indices = indices
- truncated = False
+ if not rows:
+ return "Empty TsFileDataFrame"
- rows = []
- for idx in show_indices:
- name = series_list[idx]
- info = merged_info[name]
- row = {
- "index": idx,
- "table": info["table_name"],
- "field": info["field"],
- "start_time": format_timestamp(info["min_time"]),
- "end_time": format_timestamp(info["max_time"]),
- "count": info["count"],
+ rendered_rows = []
+ for row in rows:
+ rendered = {
+ "index": row["index"],
+ "table": row["table"],
+ "field": row["field"],
+ "start_time": format_timestamp(row["start_time"]),
+ "end_time": format_timestamp(row["end_time"]),
+ "count": row["count"],
}
for tag_col in tag_columns:
- row[tag_col] = info["tag_values"].get(tag_col, "")
- rows.append(row)
-
- if not rows:
- return "Empty TsFileDataFrame"
+ rendered[tag_col] = row.get(tag_col, "")
+ rendered_rows.append(rendered)
headers = ["", "table"] + tag_columns + ["field", "start_time",
"end_time", "count"]
widths = {header: len(header) for header in headers}
- widths[""] = max(len(str(row["index"])) for row in rows)
+ widths[""] = max(len(str(row["index"])) for row in rendered_rows)
- for row in rows:
+ for row in rendered_rows:
widths[""] = max(widths[""], len(str(row["index"])))
widths["table"] = max(widths["table"], len(row["table"]))
widths["field"] = max(widths["field"], len(row["field"]))
@@ -151,8 +136,8 @@ def format_dataframe_table(
widths[tag_col] = max(widths[tag_col], len(str(row[tag_col])))
lines = [" ".join(header.rjust(widths[header]) for header in headers)]
- split = len(rows) // 2 if truncated else len(rows)
- for row_idx, row in enumerate(rows):
+ split = split_index if truncated and split_index is not None else
len(rendered_rows)
+ for row_idx, row in enumerate(rendered_rows):
if truncated and row_idx == split:
lines.append("...")
parts = [str(row["index"]).rjust(widths[""]),
row["table"].rjust(widths["table"])]
diff --git a/python/tsfile/dataset/metadata.py
b/python/tsfile/dataset/metadata.py
index 195e3e9f..2a4938b3 100644
--- a/python/tsfile/dataset/metadata.py
+++ b/python/tsfile/dataset/metadata.py
@@ -69,6 +69,8 @@ class MetadataCatalog:
device_entries: List[DeviceEntry] = field(default_factory=list)
table_id_by_name: Dict[str, int] = field(default_factory=dict)
device_id_by_key: Dict[Tuple[int, tuple], int] =
field(default_factory=dict)
+ tables_with_sparse_tag_values: set = field(default_factory=set)
+ sparse_device_ids_by_compressed_path: Dict[Tuple[int, Tuple[str, ...]],
List[int]] = field(default_factory=dict)
series_stats_by_ref: Dict[Tuple[int, int], Dict[str, int]] =
field(default_factory=dict)
def add_table(
@@ -97,7 +99,8 @@ class MetadataCatalog:
min_time: int,
max_time: int,
) -> int:
- key = (table_id, tuple(tag_values))
+ normalized_tag_values = _normalize_tag_values(tag_values)
+ key = (table_id, normalized_tag_values)
if key in self.device_id_by_key:
return self.device_id_by_key[key]
@@ -105,12 +108,16 @@ class MetadataCatalog:
self.device_entries.append(
DeviceEntry(
table_id=table_id,
- tag_values=tuple(tag_values),
+ tag_values=normalized_tag_values,
min_time=min_time,
max_time=max_time,
)
)
self.device_id_by_key[key] = device_id
+ if _has_sparse_tag_holes(normalized_tag_values):
+ self.tables_with_sparse_tag_values.add(table_id)
+ compressed_key = (table_id,
_compressed_tag_path_components(normalized_tag_values))
+
self.sparse_device_ids_by_compressed_path.setdefault(compressed_key,
[]).append(device_id)
return device_id
@property
@@ -122,6 +129,21 @@ def _escape_path_component(value: Any) -> str:
return str(value).replace(_PATH_ESCAPE, _PATH_ESCAPE *
2).replace(_PATH_SEPARATOR, _PATH_ESCAPE + _PATH_SEPARATOR)
+def _normalize_tag_values(tag_values: Iterable[Any]) -> Tuple[Any, ...]:
+ values = list(tag_values)
+ while values and values[-1] is None:
+ values.pop()
+ return tuple(values)
+
+
+def _compressed_tag_path_components(tag_values: Iterable[Any]) -> Tuple[str,
...]:
+ return tuple(str(value) for value in tag_values if value is not None)
+
+
+def _has_sparse_tag_holes(tag_values: Iterable[Any]) -> bool:
+ return any(value is None for value in tag_values)
+
+
def split_logical_series_path(series_path: str) -> List[str]:
parts = []
current = []
@@ -148,17 +170,37 @@ def split_logical_series_path(series_path: str) ->
List[str]:
return parts
-def build_logical_series_path(table_name: str, tag_values: Iterable[Any],
field_name: str) -> str:
- components = [table_name, *tag_values, field_name]
+def build_logical_series_path(
+ table_name: str,
+ tag_values: Iterable[Any],
+ field_name: str,
+ tag_columns: Iterable[str] = (),
+) -> str:
+ components = build_logical_series_components(table_name, tag_values,
field_name, tag_columns)
return _PATH_SEPARATOR.join(_escape_path_component(component) for
component in components)
+def build_logical_series_components(
+ table_name: str,
+ tag_values: Iterable[Any],
+ field_name: str,
+ _tag_columns: Iterable[str] = (),
+) -> List[str]:
+ components = [table_name, *_compressed_tag_path_components(tag_values),
field_name]
+ return [str(component) for component in components]
+
+
def build_series_path(catalog: MetadataCatalog, device_id: int, field_idx:
int) -> str:
"""Return the external logical series name for one device field."""
device_entry = catalog.device_entries[device_id]
table_entry = catalog.table_entries[device_entry.table_id]
field_name = table_entry.field_columns[field_idx]
- return build_logical_series_path(table_entry.table_name,
device_entry.tag_values, field_name)
+ return build_logical_series_path(
+ table_entry.table_name,
+ device_entry.tag_values,
+ field_name,
+ table_entry.tag_columns,
+ )
def iter_series_refs(catalog: MetadataCatalog) -> Iterator[Tuple[int, int]]:
@@ -187,25 +229,45 @@ def resolve_series_path(catalog: MetadataCatalog,
series_path: str) -> Tuple[int
table_id = catalog.table_id_by_name[table_name]
table_entry = catalog.table_entries[table_id]
- expected_parts = len(table_entry.tag_columns) + 2
- if len(parts) > expected_parts:
- raise ValueError(f"Series not found: {series_path}")
-
field_name = parts[-1]
try:
field_idx = table_entry.get_field_index(field_name)
except ValueError as exc:
raise ValueError(f"Series not found: {series_path}") from exc
- tag_values = tuple(
+ tag_parts = parts[1:-1]
+ direct_device_id = None
+ direct_tag_values = _normalize_tag_values(
_coerce_path_component(raw_value, tag_type)
- for raw_value, tag_type in zip(parts[1:-1], table_entry.tag_types)
+ for raw_value, tag_type in zip(tag_parts, table_entry.tag_types)
)
- key = (table_id, tag_values)
- if key not in catalog.device_id_by_key:
+ direct_key = (table_id, direct_tag_values)
+ if direct_key in catalog.device_id_by_key:
+ direct_device_id = catalog.device_id_by_key[direct_key]
+
+ if table_id not in catalog.tables_with_sparse_tag_values:
+ if direct_device_id is None:
+ raise ValueError(f"Series not found: {series_path}")
+ return table_id, direct_device_id, field_idx
+
+ compressed_key = (table_id, tuple(tag_parts))
+ sparse_device_ids =
catalog.sparse_device_ids_by_compressed_path.get(compressed_key, [])
+ candidate_ids = []
+ seen_ids = set()
+ if direct_device_id is not None:
+ candidate_ids.append(direct_device_id)
+ seen_ids.add(direct_device_id)
+ for device_id in sparse_device_ids:
+ if device_id in seen_ids:
+ continue
+ candidate_ids.append(device_id)
+ seen_ids.add(device_id)
+ if not candidate_ids:
raise ValueError(f"Series not found: {series_path}")
+ if len(candidate_ids) > 1:
+ raise ValueError(f"Ambiguous series path: {series_path}")
- return table_id, catalog.device_id_by_key[key], field_idx
+ return table_id, candidate_ids[0], field_idx
def _coerce_path_component(value: str, data_type: TSDataType) -> Any:
diff --git a/python/tsfile/dataset/reader.py b/python/tsfile/dataset/reader.py
index 953a22cb..cb5b7016 100644
--- a/python/tsfile/dataset/reader.py
+++ b/python/tsfile/dataset/reader.py
@@ -44,7 +44,16 @@ def _to_python_scalar(value):
return value.item() if hasattr(value, "item") else value
+def _ensure_supported_exact_tag_values(tag_values: Dict[str, object]) -> None:
+ if any(tag_value is None for tag_value in tag_values.values()):
+ raise NotImplementedError(
+ "Exact tag matching with None tag values is not supported yet. "
+ "Native tag filter support for IS NULL / IS NOT NULL is required."
+ )
+
+
def _build_exact_tag_filter(tag_values: Dict[str, object]):
+ _ensure_supported_exact_tag_values(tag_values)
tag_filter = None
for tag_column, tag_value in tag_values.items():
expr = tag_eq(tag_column, str(tag_value))
@@ -119,6 +128,9 @@ class TsFileSeriesReader:
self._catalog = MetadataCatalog()
table_names = list(table_schemas.keys())
metadata_groups = self._reader.get_timeseries_metadata(None)
+ if self.show_progress:
+ sys.stderr.write(f"\rReading TsFile metadata:
0/{len(table_names)}")
+ sys.stderr.flush()
for table_index, table_name in enumerate(table_names):
table_schema = table_schemas[table_name]
@@ -183,15 +195,12 @@ class TsFileSeriesReader:
)
sys.stderr.flush()
- if self.show_progress and self.series_count > 0:
+ if self.show_progress:
sys.stderr.write(
f"\rReading TsFile metadata: {len(table_names)} table(s),
{self.series_count} series ... done\n"
)
sys.stderr.flush()
- if self.series_count == 0:
- raise ValueError("No valid numeric series found in TsFile")
-
@staticmethod
def _metadata_device_stats(group) -> dict:
"""Derive cheap device-level metadata hints from native field
statistics.
@@ -218,11 +227,16 @@ class TsFileSeriesReader:
A table-model DeviceID may only materialize a prefix of the declared
tag columns. Preserve the available prefix rather than requiring a
- full-length tag tuple here.
+ full-length tag tuple here. Some backends may still materialize
+ trailing missing tags as explicit ``None`` values; normalize those
+ back to the same prefix representation.
"""
if tag_count == 0:
return ()
- return tuple(group.segments[1 : min(len(group.segments), 1 +
tag_count)])
+ values = list(group.segments[1 : min(len(group.segments), 1 +
tag_count)])
+ while values and values[-1] is None:
+ values.pop()
+ return tuple(values)
@staticmethod
def _metadata_field_stats(group) -> Dict[str, dict]:
@@ -308,25 +322,50 @@ class TsFileSeriesReader:
def read_series_by_row(self, device_id: int, field_idx: int, offset: int,
limit: int) -> Tuple[np.ndarray, np.ndarray]:
"""Read one logical series by device-local row offset/limit."""
+ if limit <= 0:
+ return np.array([], dtype=np.int64), np.array([], dtype=np.float64)
+
table_entry, device_entry, field_name =
self._resolve_series_ref(device_id, field_idx)
tag_values = dict(zip(table_entry.tag_columns,
device_entry.tag_values))
tag_filter = _build_exact_tag_filter(tag_values) if tag_values else
None
- timestamps = []
- values = []
- with self._reader.query_table_by_row(
- table_entry.table_name,
- [field_name],
- offset=offset,
- limit=limit,
- tag_filter=tag_filter,
- ) as result_set:
- while result_set.next():
- timestamps.append(result_set.get_value_by_name("time"))
- value = result_set.get_value_by_name(field_name)
- values.append(np.nan if value is None else float(value))
+ # Some native row-query paths stop at an internal block boundary even
+ # when the requested window extends further. Re-issue from the advanced
+ # offset until we fill the caller's logical row window or reach EOF.
+ timestamp_parts = []
+ value_parts = []
+ remaining = limit
+ next_offset = offset
+
+ while remaining > 0:
+ batch_timestamps = []
+ batch_values = []
+ with self._reader.query_table_by_row(
+ table_entry.table_name,
+ [field_name],
+ offset=next_offset,
+ limit=remaining,
+ tag_filter=tag_filter,
+ ) as result_set:
+ while result_set.next():
+
batch_timestamps.append(result_set.get_value_by_name("time"))
+ value = result_set.get_value_by_name(field_name)
+ batch_values.append(np.nan if value is None else
float(value))
+
+ if not batch_timestamps:
+ break
+
+ timestamp_parts.append(np.asarray(batch_timestamps,
dtype=np.int64))
+ value_parts.append(np.asarray(batch_values, dtype=np.float64))
+ read_count = len(batch_timestamps)
+ next_offset += read_count
+ remaining -= read_count
- return np.asarray(timestamps, dtype=np.int64), np.asarray(values,
dtype=np.float64)
+ if not timestamp_parts:
+ return np.array([], dtype=np.int64), np.array([], dtype=np.float64)
+ if len(timestamp_parts) == 1:
+ return timestamp_parts[0], value_parts[0]
+ return np.concatenate(timestamp_parts), np.concatenate(value_parts)
def read_device_fields_by_time_range(
self, device_id: int, field_indices: List[int], start_time: int,
end_time: int
@@ -394,7 +433,13 @@ class TsFileSeriesReader:
{field_column: np.array([], dtype=np.float64) for field_column
in field_columns},
)
- return (
- np.concatenate(timestamp_parts).astype(np.int64),
- {field_column: np.concatenate(field_parts[field_column]) for
field_column in field_columns},
- )
+ timestamps = np.concatenate(timestamp_parts).astype(np.int64)
+ field_values = {field_column:
np.concatenate(field_parts[field_column]) for field_column in field_columns}
+
+ # Keep the dataset layer strict about the requested time window even if
+ # the underlying query path returns boundary-adjacent null rows.
+ mask = (timestamps >= start_time) & (timestamps <= end_time)
+ timestamps = timestamps[mask]
+ field_values = {field_column: values[mask] for field_column, values in
field_values.items()}
+
+ return timestamps, field_values
diff --git a/python/tsfile/tsfile_py_cpp.pxd b/python/tsfile/tsfile_py_cpp.pxd
index 55da081c..7286e78f 100644
--- a/python/tsfile/tsfile_py_cpp.pxd
+++ b/python/tsfile/tsfile_py_cpp.pxd
@@ -54,7 +54,7 @@ cdef public api ResultSet
tsfile_reader_query_table_on_tree_c(TsFileReader reade
int64_t start_time, int64_t
end_time)
cdef public api ResultSet tsfile_reader_query_table_batch_c(TsFileReader
reader, object table_name, object column_list,
int64_t start_time, int64_t
end_time,
- TagFilterHandle tag_filter,
int batch_size)
+ void* tag_filter, int
batch_size)
cdef public api ResultSet tsfile_reader_query_paths_c(TsFileReader reader,
object device_name, object sensor_list, int64_t start_time,
int64_t end_time)
@@ -64,11 +64,11 @@ cdef public api ResultSet
tsfile_reader_query_tree_by_row_c(TsFileReader reader,
cdef public api ResultSet tsfile_reader_query_table_by_row_c(TsFileReader
reader, object table_name,
object
column_list, int offset,
- int limit,
TagFilterHandle tag_filter,
+ int limit, void*
tag_filter,
int batch_size)
cdef public api ResultSet
tsfile_reader_query_table_with_tag_filter_c(TsFileReader reader, object
table_name,
object
column_list, int64_t start_time,
- int64_t
end_time, TagFilterHandle tag_filter,
+ int64_t
end_time, void* tag_filter,
int
batch_size)
cdef public api object get_table_schema(TsFileReader reader, object table_name)
@@ -78,4 +78,4 @@ cdef public api object reader_get_all_devices_c(TsFileReader
reader)
cdef public api object reader_get_timeseries_metadata_c(TsFileReader reader,
object device_ids)
cpdef public api object get_tsfile_config()
-cpdef public api void set_tsfile_config(dict new_config)
\ No newline at end of file
+cpdef public api void set_tsfile_config(dict new_config)
diff --git a/python/tsfile/tsfile_py_cpp.pyx b/python/tsfile/tsfile_py_cpp.pyx
index 70518b70..1aea243e 100644
--- a/python/tsfile/tsfile_py_cpp.pyx
+++ b/python/tsfile/tsfile_py_cpp.pyx
@@ -811,7 +811,7 @@ cdef ResultSet
tsfile_reader_query_table_by_row_c(TsFileReader reader,
object table_name,
object column_list,
int offset, int limit,
- TagFilterHandle tag_filter,
+ void* tag_filter,
int batch_size):
cdef ResultSet result
cdef int column_num = len(column_list)
@@ -844,7 +844,7 @@ cdef ResultSet
tsfile_reader_query_table_by_row_c(TsFileReader reader,
columns = NULL
cdef ResultSet tsfile_reader_query_table_batch_c(TsFileReader reader, object
table_name, object column_list,
- int64_t start_time, int64_t
end_time, TagFilterHandle tag_filter,
+ int64_t start_time, int64_t
end_time, void* tag_filter,
int batch_size):
cdef ResultSet result
cdef int column_num = len(column_list)
@@ -906,7 +906,7 @@ cdef ResultSet tsfile_reader_query_paths_c(TsFileReader
reader, object device_na
cdef ResultSet tsfile_reader_query_table_with_tag_filter_c(TsFileReader
reader, object table_name,
object
column_list, int64_t start_time,
- int64_t
end_time, TagFilterHandle tag_filter,
+ int64_t
end_time, void* tag_filter,
int batch_size):
cdef ResultSet result
cdef int column_num = len(column_list)