This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/tsfile.git
The following commit(s) were added to refs/heads/develop by this push:
new 21d70d65 Support TsfileDataFrame (#765)
21d70d65 is described below
commit 21d70d65764386548ed6fd9fd4c428346db8003e
Author: YangCaiyin <[email protected]>
AuthorDate: Wed Apr 8 12:04:54 2026 +0800
Support TsfileDataFrame (#765)
* support tsfileDataFrame
* make tsfileDataFrame a module
* support metadata management and optmize implementation
* support show() method in tsdf
* fix typo and address close problem
---
python/setup.py | 2 +-
python/tests/test_tsfile_dataset.py | 397 ++++++++++++++++++++++
python/tsfile/__init__.py | 3 +-
python/tsfile/dataset/__init__.py | 24 ++
python/tsfile/dataset/dataframe.py | 635 ++++++++++++++++++++++++++++++++++++
python/tsfile/dataset/formatting.py | 171 ++++++++++
python/tsfile/dataset/merge.py | 154 +++++++++
python/tsfile/dataset/metadata.py | 227 +++++++++++++
python/tsfile/dataset/reader.py | 354 ++++++++++++++++++++
python/tsfile/dataset/timeseries.py | 155 +++++++++
10 files changed, 2120 insertions(+), 2 deletions(-)
diff --git a/python/setup.py b/python/setup.py
index 759abab3..16fc7aa8 100644
--- a/python/setup.py
+++ b/python/setup.py
@@ -144,7 +144,7 @@ exts = [
setup(
name="tsfile",
version=version,
- packages=["tsfile"],
+ packages=["tsfile", "tsfile.dataset"],
package_dir={"": "."},
include_package_data=True,
ext_modules=cythonize(exts, compiler_directives={"language_level": 3}),
diff --git a/python/tests/test_tsfile_dataset.py
b/python/tests/test_tsfile_dataset.py
new file mode 100644
index 00000000..63cd439a
--- /dev/null
+++ b/python/tests/test_tsfile_dataset.py
@@ -0,0 +1,397 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import numpy as np
+import pandas as pd
+import pytest
+
+from tsfile import ColumnCategory, ColumnSchema, TSDataType, TableSchema,
TsFileTableWriter
+from tsfile import AlignedTimeseries, Timeseries, TsFileDataFrame
+from tsfile.dataset.formatting import format_timestamp
+from tsfile.dataset.reader import TsFileSeriesReader
+
+
+def _write_weather_file(path, start):
+ schema = TableSchema(
+ "weather",
+ [
+ ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG),
+ ColumnSchema("temperature", TSDataType.DOUBLE,
ColumnCategory.FIELD),
+ ColumnSchema("humidity", TSDataType.DOUBLE, ColumnCategory.FIELD),
+ ],
+ )
+ df = pd.DataFrame(
+ {
+ "time": [start, start + 1, start + 2],
+ "device": ["device_a", "device_a", "device_a"],
+ "temperature": [20.0, 21.5, 23.0],
+ "humidity": [50.0, 52.0, 55.0],
+ }
+ )
+ with TsFileTableWriter(str(path), schema) as writer:
+ writer.write_dataframe(df)
+
+
+def _write_numeric_and_text_file(path):
+ schema = TableSchema(
+ "weather",
+ [
+ ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG),
+ ColumnSchema("temperature", TSDataType.DOUBLE,
ColumnCategory.FIELD),
+ ColumnSchema("status", TSDataType.STRING, ColumnCategory.FIELD),
+ ],
+ )
+ df = pd.DataFrame(
+ {
+ "time": [0, 1, 2],
+ "device": ["device_a", "device_a", "device_a"],
+ "temperature": [20.0, np.nan, 23.5],
+ "status": ["ok", "warn", "ok"],
+ }
+ )
+ with TsFileTableWriter(str(path), schema) as writer:
+ writer.write_dataframe(df)
+
+
+def _write_partial_numeric_rows_file(path):
+ schema = TableSchema(
+ "weather",
+ [
+ ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG),
+ ColumnSchema("temperature", TSDataType.DOUBLE,
ColumnCategory.FIELD),
+ ColumnSchema("humidity", TSDataType.DOUBLE, ColumnCategory.FIELD),
+ ],
+ )
+ df = pd.DataFrame(
+ {
+ "time": [0, 1],
+ "device": ["device_a", "device_a"],
+ "temperature": [np.nan, 21.0],
+ "humidity": [50.0, 51.0],
+ }
+ )
+ with TsFileTableWriter(str(path), schema) as writer:
+ writer.write_dataframe(df)
+
+
+def _write_weather_with_extra_field_file(path, start):
+ schema = TableSchema(
+ "weather",
+ [
+ ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG),
+ ColumnSchema("temperature", TSDataType.DOUBLE,
ColumnCategory.FIELD),
+ ColumnSchema("humidity", TSDataType.DOUBLE, ColumnCategory.FIELD),
+ ColumnSchema("pressure", TSDataType.DOUBLE, ColumnCategory.FIELD),
+ ],
+ )
+ df = pd.DataFrame(
+ {
+ "time": [start, start + 1],
+ "device": ["device_a", "device_a"],
+ "temperature": [20.0, 21.0],
+ "humidity": [50.0, 51.0],
+ "pressure": [1000.0, 1001.0],
+ }
+ )
+ with TsFileTableWriter(str(path), schema) as writer:
+ writer.write_dataframe(df)
+
+
+def _write_multi_tag_file(path):
+ schema = TableSchema(
+ "weather",
+ [
+ ColumnSchema("city", TSDataType.STRING, ColumnCategory.TAG),
+ ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG),
+ ColumnSchema("temperature", TSDataType.DOUBLE,
ColumnCategory.FIELD),
+ ColumnSchema("humidity", TSDataType.DOUBLE, ColumnCategory.FIELD),
+ ColumnSchema("status", TSDataType.STRING, ColumnCategory.FIELD),
+ ],
+ )
+ df = pd.DataFrame(
+ {
+ "time": [0, 1, 0, 1],
+ "city": ["beijing", "beijing", "shanghai", "shanghai"],
+ "device": ["device_a", "device_a", "device_b", "device_b"],
+ "temperature": [20.0, 21.0, 24.0, 25.0],
+ "humidity": [50.0, 51.0, 60.0, 61.0],
+ "status": ["ok", "ok", "warn", "warn"],
+ }
+ )
+ with TsFileTableWriter(str(path), schema) as writer:
+ writer.write_dataframe(df)
+
+
+def _write_special_tag_file(path):
+ schema = TableSchema(
+ "weather",
+ [
+ ColumnSchema("city", TSDataType.STRING, ColumnCategory.TAG),
+ ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG),
+ ColumnSchema("temperature", TSDataType.DOUBLE,
ColumnCategory.FIELD),
+ ],
+ )
+ df = pd.DataFrame(
+ {
+ "time": [0, 1],
+ "city": ["bei.jing", "bei.jing"],
+ "device": [r"dev\1", r"dev\1"],
+ "temperature": [20.0, 21.0],
+ }
+ )
+ with TsFileTableWriter(str(path), schema) as writer:
+ writer.write_dataframe(df)
+
+
+def test_dataset_top_level_imports():
+ assert TsFileDataFrame.__module__ == "tsfile.dataset.dataframe"
+ assert Timeseries.__module__ == "tsfile.dataset.timeseries"
+ assert AlignedTimeseries.__module__ == "tsfile.dataset.timeseries"
+
+
+def test_format_timestamp_preserves_millisecond_precision():
+ assert "." not in format_timestamp(1000)
+ assert format_timestamp(1).endswith(".001")
+
+
+def test_dataset_basic_access_patterns(tmp_path, capsys):
+ path1 = tmp_path / "part1.tsfile"
+ path2 = tmp_path / "part2.tsfile"
+ _write_weather_file(path1, 0)
+ _write_weather_file(path2, 3)
+
+ with TsFileDataFrame([str(path1), str(path2)], show_progress=False) as
tsdf:
+ assert len(tsdf) == 2
+
+ first = tsdf[0]
+ assert isinstance(first, Timeseries)
+ assert first.name in tsdf.list_timeseries()
+ assert len(first) == 6
+ assert first[0] == 20.0
+ assert first[-1] == 23.0
+ assert "Timeseries(" in repr(first)
+
+ by_name = tsdf[first.name]
+ assert isinstance(by_name, Timeseries)
+ assert by_name.name == first.name
+
+ subset = tsdf[:1]
+ assert isinstance(subset, TsFileDataFrame)
+ assert len(subset) == 1
+
+ selected = tsdf[[0, 1]]
+ assert isinstance(selected, TsFileDataFrame)
+ assert len(selected) == 2
+
+ aligned = tsdf.loc[0:5, [0, 1]]
+ assert isinstance(aligned, AlignedTimeseries)
+ assert aligned.shape == (6, 2)
+
+ aligned_negative = tsdf.loc[0:5, [-1]]
+ assert isinstance(aligned_negative, AlignedTimeseries)
+ assert aligned_negative.shape == (6, 1)
+
+ assert list(tsdf["field"]) == ["temperature", "humidity"]
+
+ assert "TsFileDataFrame(2 time series, 2 files)" in repr(tsdf)
+ aligned.show(2)
+ assert "AlignedTimeseries(6 rows, 2 series)" in capsys.readouterr().out
+
+
+def test_dataset_exposes_only_numeric_fields_and_keeps_nan(tmp_path):
+ path = tmp_path / "numeric_and_text.tsfile"
+ _write_numeric_and_text_file(path)
+
+ with TsFileDataFrame(str(path), show_progress=False) as tsdf:
+ assert tsdf.list_timeseries() == ["weather.device_a.temperature"]
+
+ series = tsdf[0]
+ assert series.name == "weather.device_a.temperature"
+ assert np.isnan(series[1])
+ sliced = series[:3]
+ assert sliced.shape == (3,)
+ assert np.isnan(sliced[1])
+ assert series[1:1].shape == (0,)
+
+
+def test_dataset_timeseries_supports_negative_step_slices(tmp_path):
+ path = tmp_path / "weather.tsfile"
+ _write_weather_file(path, 0)
+
+ with TsFileDataFrame(str(path), show_progress=False) as tsdf:
+ series = tsdf[0]
+ np.testing.assert_array_equal(series[::-1], np.array([23.0, 21.5,
20.0]))
+ np.testing.assert_array_equal(series[::-2], np.array([23.0, 20.0]))
+
+
+def test_dataset_metadata_discovery_uses_all_numeric_fields(tmp_path):
+ path = tmp_path / "partial_numeric_rows.tsfile"
+ _write_partial_numeric_rows_file(path)
+
+ with TsFileDataFrame(str(path), show_progress=False) as tsdf:
+ assert tsdf.list_timeseries() == [
+ "weather.device_a.temperature",
+ "weather.device_a.humidity",
+ ]
+
+ assert list(tsdf["count"]) == [2, 2]
+ assert list(tsdf["start_time"]) == [0, 0]
+ assert list(tsdf["end_time"]) == [1, 1]
+
+
+def test_dataset_rejects_duplicate_timestamps_across_shards(tmp_path):
+ path1 = tmp_path / "part1.tsfile"
+ path2 = tmp_path / "part2.tsfile"
+ _write_weather_file(path1, 0)
+ _write_weather_file(path2, 2)
+
+ with pytest.raises(ValueError, match="Duplicate timestamp"):
+ TsFileDataFrame([str(path1), str(path2)], show_progress=False)
+
+
+def test_dataset_rejects_data_access_after_close(tmp_path):
+ path = tmp_path / "weather.tsfile"
+ _write_weather_file(path, 0)
+
+ tsdf = TsFileDataFrame(str(path), show_progress=False)
+ series = tsdf[0]
+ tsdf.close()
+
+ with pytest.raises(RuntimeError, match="TsFileDataFrame is closed"):
+ _ = tsdf[0]
+
+ with pytest.raises(RuntimeError, match="TsFileDataFrame is closed"):
+ _ = series[0]
+
+
+def test_subset_close_warns_and_does_not_close_root(tmp_path):
+ path = tmp_path / "weather.tsfile"
+ _write_weather_file(path, 0)
+
+ with TsFileDataFrame(str(path), show_progress=False) as tsdf:
+ subset = tsdf[:1]
+ with pytest.warns(RuntimeWarning, match="no-op"):
+ subset.close()
+
+ series = tsdf[0]
+ assert series[0] == 20.0
+
+
+def test_dataset_rejects_incompatible_table_schemas_across_shards(tmp_path):
+ path1 = tmp_path / "part1.tsfile"
+ path2 = tmp_path / "part2.tsfile"
+ _write_weather_file(path1, 0)
+ _write_weather_with_extra_field_file(path2, 2)
+
+ with pytest.raises(ValueError, match="Incompatible schema for table
'weather'"):
+ TsFileDataFrame([str(path1), str(path2)], show_progress=False)
+
+
+def test_dataset_multi_tag_metadata_discovery(tmp_path):
+ path = tmp_path / "multi_tag.tsfile"
+ _write_multi_tag_file(path)
+
+ with TsFileDataFrame(str(path), show_progress=False) as tsdf:
+ assert tsdf.list_timeseries() == [
+ "weather.beijing.device_a.temperature",
+ "weather.beijing.device_a.humidity",
+ "weather.shanghai.device_b.temperature",
+ "weather.shanghai.device_b.humidity",
+ ]
+
+ summary = pd.DataFrame(
+ {
+ "series_path": tsdf.list_timeseries(),
+ "table": tsdf["table"],
+ "city": tsdf["city"],
+ "device": tsdf["device"],
+ "field": tsdf["field"],
+ "start_time": tsdf["start_time"],
+ "end_time": tsdf["end_time"],
+ "count": tsdf["count"],
+ }
+ ).sort_values(["city", "device", "field"]).reset_index(drop=True)
+ assert list(summary.columns) == [
+ "series_path",
+ "table",
+ "city",
+ "device",
+ "field",
+ "start_time",
+ "end_time",
+ "count",
+ ]
+ assert list(summary["city"]) == ["beijing", "beijing", "shanghai",
"shanghai"]
+ assert list(summary["device"]) == ["device_a", "device_a", "device_b",
"device_b"]
+ assert list(summary["field"]) == ["humidity", "temperature",
"humidity", "temperature"]
+ assert list(summary["count"]) == [2, 2, 2, 2]
+
+
+def test_dataset_series_paths_escape_special_tag_values(tmp_path):
+ path = tmp_path / "special_tag.tsfile"
+ _write_special_tag_file(path)
+
+ with TsFileDataFrame(str(path), show_progress=False) as tsdf:
+ expected_path = r"weather.bei\.jing.dev\\1.temperature"
+ assert tsdf.list_timeseries() == [expected_path]
+
+ series = tsdf[expected_path]
+ assert isinstance(series, Timeseries)
+ assert series.name == expected_path
+ assert list(tsdf["city"]) == ["bei.jing"]
+ assert list(tsdf["device"]) == [r"dev\1"]
+
+
+def test_reader_series_paths_escape_special_tag_values(tmp_path):
+ path = tmp_path / "special_tag.tsfile"
+ _write_special_tag_file(path)
+
+ reader = TsFileSeriesReader(str(path), show_progress=False)
+ try:
+ expected_path = r"weather.bei\.jing.dev\\1.temperature"
+ assert reader.series_paths == [expected_path]
+ info = reader.get_series_info(expected_path)
+ assert info["tag_values"] == {"city": "bei.jing", "device": r"dev\1"}
+ finally:
+ reader.close()
+
+
+def test_reader_catalog_shares_device_metadata_and_resolves_paths(tmp_path):
+ path = tmp_path / "weather.tsfile"
+ _write_weather_file(path, 100)
+
+ reader = TsFileSeriesReader(str(path), show_progress=False)
+ try:
+ assert reader.series_paths == [
+ "weather.device_a.temperature",
+ "weather.device_a.humidity",
+ ]
+ assert len(reader.catalog.table_entries) == 1
+ assert len(reader.catalog.device_entries) == 1
+ assert reader.catalog.series_count == 2
+
+ by_path = reader.get_series_info("weather.device_a.temperature")
+ by_ref = reader.get_series_info_by_ref(0, 0)
+ assert by_ref == by_path
+ assert by_ref["tag_values"] == {"device": "device_a"}
+
+ ts_by_path =
reader.get_series_timestamps("weather.device_a.temperature")
+ ts_by_device = reader.get_device_timestamps(0)
+ np.testing.assert_array_equal(ts_by_path, ts_by_device)
+ finally:
+ reader.close()
diff --git a/python/tsfile/__init__.py b/python/tsfile/__init__.py
index 55fa3b9e..84ca330e 100644
--- a/python/tsfile/__init__.py
+++ b/python/tsfile/__init__.py
@@ -40,4 +40,5 @@ from .tsfile_reader import TsFileReaderPy as TsFileReader,
ResultSetPy as Result
from .tsfile_writer import TsFileWriterPy as TsFileWriter
from .tsfile_py_cpp import get_tsfile_config, set_tsfile_config
from .tsfile_table_writer import TsFileTableWriter
-from .utils import to_dataframe, dataframe_to_tsfile
\ No newline at end of file
+from .utils import to_dataframe, dataframe_to_tsfile
+from .dataset import TsFileDataFrame, Timeseries, AlignedTimeseries
diff --git a/python/tsfile/dataset/__init__.py
b/python/tsfile/dataset/__init__.py
new file mode 100644
index 00000000..4072bd4c
--- /dev/null
+++ b/python/tsfile/dataset/__init__.py
@@ -0,0 +1,24 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""Dataset-style TsFile accessors."""
+
+from .dataframe import TsFileDataFrame
+from .timeseries import AlignedTimeseries, Timeseries
+
+__all__ = ["TsFileDataFrame", "Timeseries", "AlignedTimeseries"]
diff --git a/python/tsfile/dataset/dataframe.py
b/python/tsfile/dataset/dataframe.py
new file mode 100644
index 00000000..e28a0e5e
--- /dev/null
+++ b/python/tsfile/dataset/dataframe.py
@@ -0,0 +1,635 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""Top-level dataset accessors for TsFile shards."""
+
+from collections import defaultdict
+from dataclasses import dataclass, field
+import os
+import sys
+from typing import Dict, List, Set, Tuple, Union
+import warnings
+
+import numpy as np
+
+from .formatting import format_dataframe_table
+from .metadata import TableEntry, _coerce_path_component,
build_logical_series_path, split_logical_series_path
+from .merge import build_aligned_matrix, merge_time_value_parts,
merge_timestamp_parts
+from .timeseries import AlignedTimeseries, Timeseries
+
+
+DeviceKey = Tuple[str, tuple]
+SeriesRefKey = Tuple[int, int]
+SeriesRef = Tuple[object, int, int]
+DeviceRef = Tuple[object, int]
+
+_QUERY_START = np.iinfo(np.int64).min
+_QUERY_END = np.iinfo(np.int64).max
+
+
+@dataclass(slots=True)
+class _LogicalIndex:
+ """Cross-reader logical mapping for devices and series."""
+
+ # Shared table schema references keyed by table name.
+ table_entries: Dict[str, TableEntry] = field(default_factory=dict)
+
+ # Stable logical device order, each item is (table_name, tag_values).
+ device_order: List[DeviceKey] = field(default_factory=list)
+ # Map one logical device key to its dataframe-local device index.
+ device_index_by_key: Dict[DeviceKey, int] = field(default_factory=dict)
+ # For each logical device, keep the contributing reader-local device refs.
+ device_refs: List[List[DeviceRef]] = field(default_factory=list)
+
+ # Stable logical series order, each item is (device_idx, field_idx).
+ series_refs_ordered: List[SeriesRefKey] = field(default_factory=list)
+ # Map one logical series ref to the contributing reader-local series refs.
+ series_ref_map: Dict[SeriesRefKey, List[SeriesRef]] =
field(default_factory=dict)
+ # Fast membership check for resolved series refs.
+ series_ref_set: Set[SeriesRefKey] = field(default_factory=set)
+
+
+@dataclass(slots=True)
+class _DerivedCache:
+ """Merged metadata derived from the logical index."""
+
+ devices: List[dict] = field(default_factory=list)
+ field_stats: Dict[SeriesRefKey, dict] = field(default_factory=dict)
+
+
+def _expand_paths(paths: Union[str, List[str]]) -> List[str]:
+ """Normalize file/directory inputs into a validated list of absolute
TsFile paths."""
+ if isinstance(paths, str):
+ paths = [paths]
+
+ expanded = []
+ for path in paths:
+ if os.path.isdir(path):
+ tsfiles = sorted(
+ os.path.join(root, name)
+ for root, _, files in os.walk(path)
+ for name in files
+ if name.endswith(".tsfile")
+ )
+ if not tsfiles:
+ raise FileNotFoundError(f"No .tsfile files found in directory:
{path}")
+ expanded.extend(tsfiles)
+ else:
+ expanded.append(path)
+
+ resolved = []
+ for path in expanded:
+ if not os.path.exists(path):
+ raise FileNotFoundError(f"TsFile not found: {path}")
+ resolved.append(os.path.abspath(path))
+ return resolved
+
+
+def _series_lookup_hint(name: str) -> str:
+ return f"Series not found: '{name}'. Use df.list_timeseries() to inspect
available series."
+
+
+def _validate_table_schema(existing: TableEntry, incoming: TableEntry,
file_path: str) -> None:
+ """Reject same-name tables whose tag/field layout differs across shards."""
+ if (
+ existing.tag_columns == incoming.tag_columns
+ and existing.tag_types == incoming.tag_types
+ and existing.field_columns == incoming.field_columns
+ ):
+ return
+
+ raise ValueError(
+ f"Incompatible schema for table '{incoming.table_name}' in
'{file_path}'. "
+ f"Expected tags={list(existing.tag_columns)},
tag_types={list(existing.tag_types)}, "
+ f"fields={list(existing.field_columns)} but found "
+ f"tags={list(incoming.tag_columns)},
tag_types={list(incoming.tag_types)}, "
+ f"fields={list(incoming.field_columns)}."
+ )
+
+
+def _register_reader(
+ readers: Dict[str, object],
+ index: _LogicalIndex,
+ file_path: str,
+ reader,
+) -> None:
+ """Merge one reader's catalog into the dataframe-wide logical index."""
+ readers[file_path] = reader
+ catalog = reader.catalog
+
+ for table_entry in catalog.table_entries:
+ existing_entry = index.table_entries.get(table_entry.table_name)
+ if existing_entry is None:
+ index.table_entries[table_entry.table_name] = table_entry
+ else:
+ _validate_table_schema(existing_entry, table_entry, file_path)
+
+ for device_id, device_entry in enumerate(catalog.device_entries):
+ table_entry = catalog.table_entries[device_entry.table_id]
+ device_key = (table_entry.table_name, tuple(device_entry.tag_values))
+ device_idx = index.device_index_by_key.get(device_key)
+ if device_idx is None:
+ device_idx = len(index.device_order)
+ index.device_index_by_key[device_key] = device_idx
+ index.device_order.append(device_key)
+ index.device_refs.append([])
+ index.device_refs[device_idx].append((reader, device_id))
+
+ for field_idx in range(len(table_entry.field_columns)):
+ series_ref = (device_idx, field_idx)
+ if series_ref not in index.series_ref_map:
+ index.series_refs_ordered.append(series_ref)
+ index.series_ref_map[series_ref] = []
+ index.series_ref_map[series_ref].append((reader, device_id,
field_idx))
+
+
+def _build_device_entry(refs: List[DeviceRef]) -> dict:
+ """Compute per-device time bounds after merging all contributing shards."""
+ # [Temporary] It will be replaced by query_by_row and metadata interface
in TsFile
+ if len(refs) == 1:
+ merged_timestamps = refs[0][0].get_device_timestamps(refs[0][1])
+ else:
+ merged_timestamps = merge_timestamp_parts(
+ [reader.get_device_timestamps(device_id) for reader, device_id in
refs],
+ validate_unique=True,
+ )
+
+ return {
+ "min_time": int(merged_timestamps[0]) if len(merged_timestamps) > 0
else None,
+ "max_time": int(merged_timestamps[-1]) if len(merged_timestamps) > 0
else None,
+ }
+
+
+def _merge_field_timestamps(series_name: str, refs: List[SeriesRef]) ->
np.ndarray:
+ """Load and merge the full timestamp axis for one logical series on
demand."""
+ # [Temporary] It will be replaced by query_by_row interface in TsFile
+ time_parts = []
+ for reader, device_id, field_idx in refs:
+ ts_arr, _ = reader.read_series_by_ref(device_id, field_idx,
_QUERY_START, _QUERY_END)
+ if len(ts_arr) > 0:
+ time_parts.append(ts_arr)
+
+ if not time_parts:
+ merged_timestamps = np.array([], dtype=np.int64)
+ elif len(time_parts) == 1:
+ merged_timestamps = time_parts[0]
+ else:
+ try:
+ merged_timestamps = merge_timestamp_parts(time_parts,
validate_unique=True)
+ except ValueError as e:
+ message = str(e)
+ duplicate_suffix = message.removeprefix("Duplicate timestamp ")
+ duplicate_suffix = duplicate_suffix.removesuffix(" found across
shards.")
+ raise ValueError(
+ f"Duplicate timestamp {duplicate_suffix} found for series
'{series_name}' across shards. "
+ f"Cross-shard duplicate timestamps are not supported."
+ ) from e
+
+ return merged_timestamps
+
+
+def _build_field_stats(refs: List[SeriesRef]) -> dict:
+ """Aggregate cheap per-shard stats without materializing full series
values."""
+ min_time = None
+ max_time = None
+ count = 0
+
+ for reader, device_id, field_idx in refs:
+ info = reader.get_series_info_by_ref(device_id, field_idx)
+ shard_min = info["min_time"]
+ shard_max = info["max_time"]
+ shard_count = info["length"]
+
+ if shard_count == 0:
+ continue
+
+ count += shard_count
+ min_time = shard_min if min_time is None else min(min_time, shard_min)
+ max_time = shard_max if max_time is None else max(max_time, shard_max)
+
+ return {
+ "min_time": min_time,
+ "max_time": max_time,
+ "count": count,
+ }
+
+
+class _LocIndexer:
+ """Implement ``.loc[start_time:end_time, series_list]`` for aligned
reads."""
+
+ def __init__(self, dataframe: "TsFileDataFrame"):
+ self._df = dataframe
+
+ def _parse_key(self, key):
+ if not isinstance(key, tuple) or len(key) != 2:
+ raise ValueError("loc requires exactly 2 arguments:
tsdf.loc[start_time:end_time, series_list]")
+
+ time_slice, series_spec = key
+ if isinstance(time_slice, slice):
+ start_time = _QUERY_START if time_slice.start is None else
time_slice.start
+ end_time = _QUERY_END if time_slice.stop is None else
time_slice.stop
+ elif isinstance(time_slice, (int, np.integer)):
+ start_time = end_time = int(time_slice)
+ else:
+ raise TypeError(f"Time index must be slice or int, got
{type(time_slice)}")
+
+ if isinstance(series_spec, (str, int, np.integer)):
+ series_spec = [series_spec]
+
+ series_refs = []
+ series_names = []
+ for item in series_spec:
+ if isinstance(item, (int, np.integer)):
+ idx = int(item)
+ if idx < 0:
+ idx += len(self._df._index.series_refs_ordered)
+ if idx < 0 or idx >= len(self._df._index.series_refs_ordered):
+ raise IndexError(f"Series index {item} out of range")
+ series_ref = self._df._index.series_refs_ordered[idx]
+ elif isinstance(item, str):
+ series_ref = self._df._resolve_series_name(item)
+ else:
+ raise TypeError(f"Series specifier must be int or str, got
{type(item)}")
+ series_refs.append(series_ref)
+ series_names.append(self._df._build_series_name(series_ref))
+
+ return start_time, end_time, series_refs, series_names
+
+ def _query_aligned(self, start_time: int, end_time: int, series_refs:
List[SeriesRefKey], series_names: List[str]):
+ """Batch aligned reads by reader/device, then merge per-series
fragments."""
+ self._df._assert_open()
+ groups = defaultdict(list)
+ for col_idx, series_ref in enumerate(series_refs):
+ device_idx, field_idx = series_ref
+ device_info = self._df._cache.devices[device_idx]
+ if device_info["max_time"] is None or device_info["max_time"] <
start_time or device_info["min_time"] > end_time:
+ continue
+
+ _, table_entry, _ = self._df._get_series_components(series_ref)
+ field_name = table_entry.field_columns[field_idx]
+ for reader, device_id, reader_field_idx in
self._df._index.series_ref_map[series_ref]:
+ groups[(id(reader), device_id)].append(
+ (col_idx, reader_field_idx, field_name,
series_names[col_idx], reader, device_id)
+ )
+
+ series_time_parts = defaultdict(list)
+ series_value_parts = defaultdict(list)
+ for entries in groups.values():
+ reader = entries[0][4]
+ device_id = entries[0][5]
+ field_indices = list(dict.fromkeys(entry[1] for entry in entries))
+ ts_arr, field_vals =
reader.read_device_fields_by_time_range(device_id, field_indices, start_time,
end_time)
+ for _, _, field_name, series_name, _, _ in entries:
+ if len(ts_arr) > 0:
+ series_time_parts[series_name].append(ts_arr)
+
series_value_parts[series_name].append(field_vals[field_name])
+
+ series_data = {}
+ for name in series_names:
+ series_data[name] =
merge_time_value_parts(series_time_parts[name], series_value_parts[name])
+
+ return build_aligned_matrix(series_names, series_data)
+
+ def __getitem__(self, key) -> AlignedTimeseries:
+ start_time, end_time, series_refs, series_names = self._parse_key(key)
+ timestamps, values = self._query_aligned(start_time, end_time,
series_refs, series_names)
+ return AlignedTimeseries(timestamps, values, series_names)
+
+
+class TsFileDataFrame:
+ """Lazy-loaded unified numeric dataset view over multiple TsFile shards."""
+
+ def __init__(self, paths: Union[str, List[str]], show_progress: bool =
True):
+ self._paths = _expand_paths(paths)
+ self._show_progress = show_progress
+ self._readers: Dict[str, object] = {}
+ self._index = _LogicalIndex()
+ self._cache = _DerivedCache()
+ self._is_view = False
+ self._root = None
+ self._closed = False
+ self._load_metadata()
+
+ @classmethod
+ def _from_subset(cls, parent: "TsFileDataFrame", series_refs:
List[SeriesRefKey]) -> "TsFileDataFrame":
+ """Create a lightweight view that reuses the parent's readers and
caches."""
+ obj = object.__new__(cls)
+ obj._root = parent._root if parent._is_view else parent
+ obj._is_view = True
+ obj._paths = parent._paths
+ obj._show_progress = parent._show_progress
+ obj._readers = parent._readers
+ obj._index = _LogicalIndex(
+ table_entries=parent._index.table_entries,
+ device_order=parent._index.device_order,
+ device_index_by_key=parent._index.device_index_by_key,
+ device_refs=parent._index.device_refs,
+ series_refs_ordered=list(series_refs),
+ series_ref_map=parent._index.series_ref_map,
+ series_ref_set=set(series_refs),
+ )
+ obj._cache = _DerivedCache(devices=parent._cache.devices,
field_stats=parent._cache.field_stats)
+ obj._closed = False
+ return obj
+
+ def _owner(self) -> "TsFileDataFrame":
+ return self._root if self._is_view else self
+
+ def _assert_open(self):
+ if self._owner()._closed:
+ raise RuntimeError("Current TsFileDataFrame is closed.")
+
+ def _load_metadata(self):
+ """Build the logical cross-file index and the derived per-series
caches."""
+ from .reader import TsFileSeriesReader
+
+ if len(self._paths) >= 2:
+ self._load_metadata_parallel(TsFileSeriesReader)
+ else:
+ self._load_metadata_serial(TsFileSeriesReader)
+
+ self._cache.devices = [_build_device_entry(refs) for refs in
self._index.device_refs]
+ for series_ref in self._index.series_refs_ordered:
+ self._cache.field_stats[series_ref] =
_build_field_stats(self._index.series_ref_map[series_ref])
+
+ self._index.series_ref_set = set(self._index.series_refs_ordered)
+ if not self._index.series_refs_ordered:
+ raise ValueError("No valid time series found in the provided
TsFile files")
+
+ def _load_metadata_serial(self, reader_class):
+ for file_path in self._paths:
+ _register_reader(
+ self._readers,
+ self._index,
+ file_path,
+ reader_class(file_path, show_progress=self._show_progress),
+ )
+
+ def _load_metadata_parallel(self, reader_class):
+ from concurrent.futures import ThreadPoolExecutor, as_completed
+
+ def open_file(file_path):
+ return file_path, reader_class(file_path, show_progress=False)
+
+ total = len(self._paths)
+ with ThreadPoolExecutor(max_workers=min(total, os.cpu_count() or 4))
as executor:
+ futures = {executor.submit(open_file, path): path for path in
self._paths}
+ results = {}
+ done = 0
+ for future in as_completed(futures):
+ file_path, reader = future.result()
+ results[file_path] = reader
+ done += 1
+ if self._show_progress:
+ sys.stderr.write(f"\rLoading TsFile shards:
{done}/{total}")
+ sys.stderr.flush()
+
+ if self._show_progress and total > 0:
+ total_series = sum(reader.series_count for reader in
results.values())
+ sys.stderr.write(f"\rLoading TsFile shards: {total}/{total}
({total_series} series) ... done\n")
+ sys.stderr.flush()
+
+ for file_path in self._paths:
+ _register_reader(
+ self._readers,
+ self._index,
+ file_path,
+ results[file_path],
+ )
+
+ def _get_series_components(self, series_ref: SeriesRefKey) ->
Tuple[DeviceKey, TableEntry, int]:
+ device_idx, field_idx = series_ref
+ device_key = self._index.device_order[device_idx]
+ return device_key, self._index.table_entries[device_key[0]], field_idx
+
+ def _build_series_name(self, series_ref: SeriesRefKey) -> str:
+ device_key, table_entry, field_idx =
self._get_series_components(series_ref)
+ table_name, tag_values = device_key
+ field_name = table_entry.field_columns[field_idx]
+ return build_logical_series_path(table_name, tag_values, field_name)
+
+ def _resolve_series_name(self, series_name: str) -> SeriesRefKey:
+ try:
+ parts = split_logical_series_path(series_name)
+ except ValueError as exc:
+ raise KeyError(_series_lookup_hint(series_name)) from exc
+ if len(parts) < 2:
+ raise KeyError(_series_lookup_hint(series_name))
+
+ table_name = parts[0]
+ if table_name not in self._index.table_entries:
+ raise KeyError(_series_lookup_hint(series_name))
+
+ table_entry = self._index.table_entries[table_name]
+ expected_parts = len(table_entry.tag_columns) + 2
+ if len(parts) != expected_parts:
+ raise KeyError(_series_lookup_hint(series_name))
+
+ field_name = parts[-1]
+ try:
+ field_idx = table_entry.get_field_index(field_name)
+ except ValueError as exc:
+ raise KeyError(_series_lookup_hint(series_name)) from exc
+
+ tag_values = tuple(
+ _coerce_path_component(raw_value, tag_type)
+ for raw_value, tag_type in zip(parts[1:-1], table_entry.tag_types)
+ )
+ device_key = (table_name, tag_values)
+ device_idx = self._index.device_index_by_key.get(device_key)
+ if device_idx is None:
+ raise KeyError(_series_lookup_hint(series_name))
+
+ series_ref = (device_idx, field_idx)
+ if series_ref not in self._index.series_ref_set:
+ raise KeyError(_series_lookup_hint(series_name))
+ return series_ref
+
+ def _build_series_info(self, series_ref: SeriesRefKey) -> dict:
+ device_idx, field_idx = series_ref
+ device_key, table_entry, _ = self._get_series_components(series_ref)
+ field_stats = self._cache.field_stats[series_ref]
+ return {
+ "table_name": table_entry.table_name,
+ "field": table_entry.field_columns[field_idx],
+ "tag_columns": table_entry.tag_columns,
+ "tag_values": dict(zip(table_entry.tag_columns, device_key[1])),
+ "min_time": field_stats["min_time"],
+ "max_time": field_stats["max_time"],
+ "count": field_stats["count"],
+ }
+
+ def __len__(self) -> int:
+ return len(self._index.series_refs_ordered)
+
+ def list_timeseries(self, path_prefix: str = "") -> List[str]:
+ names = [self._build_series_name(series_ref) for series_ref in
self._index.series_refs_ordered]
+ if not path_prefix:
+ return names
+ prefix = path_prefix if path_prefix.endswith(".") else path_prefix +
"."
+ return [name for name in names if name.startswith(prefix) or name ==
path_prefix]
+
+ def _get_timeseries(self, series_ref: SeriesRefKey) -> Timeseries:
+ self._assert_open()
+ series_name = self._build_series_name(series_ref)
+ return Timeseries(
+ series_name,
+ self._index.series_ref_map[series_ref],
+ self._cache.field_stats[series_ref],
+ self._assert_open,
+ lambda: _merge_field_timestamps(series_name,
self._index.series_ref_map[series_ref]),
+ )
+
+ def __getitem__(self, key):
+ try:
+ import pandas as pd
+
+ if isinstance(key, pd.Series) and key.dtype == bool:
+ selected = [self._index.series_refs_ordered[idx] for idx in
key.index[key]]
+ return TsFileDataFrame._from_subset(self, selected)
+ except ImportError:
+ pass
+
+ if isinstance(key, (int, np.integer)):
+ idx = int(key)
+ if idx < 0:
+ idx += len(self._index.series_refs_ordered)
+ if idx < 0 or idx >= len(self._index.series_refs_ordered):
+ raise IndexError(f"Index {idx} out of range [0,
{len(self._index.series_refs_ordered)})")
+ return self._get_timeseries(self._index.series_refs_ordered[idx])
+
+ if isinstance(key, str):
+ try:
+ return self._get_timeseries(self._resolve_series_name(key))
+ except KeyError:
+ pass
+
+ valid_columns = {"table", "field", "start_time", "end_time",
"count"}
+ valid_columns.update(self._collect_tag_columns())
+ if key not in valid_columns:
+ raise KeyError(_series_lookup_hint(key))
+
+ import pandas as pd
+
+ values = []
+ for series_ref in self._index.series_refs_ordered:
+ info = self._build_series_info(series_ref)
+ if key == "table":
+ values.append(info["table_name"])
+ elif key == "field":
+ values.append(info["field"])
+ elif key == "start_time":
+ values.append(info["min_time"])
+ elif key == "end_time":
+ values.append(info["max_time"])
+ elif key == "count":
+ values.append(info["count"])
+ else:
+ values.append(info["tag_values"].get(key, ""))
+ return pd.Series(values, name=key)
+
+ if isinstance(key, slice):
+ return TsFileDataFrame._from_subset(
+ self,
+ [self._index.series_refs_ordered[idx] for idx in
range(*key.indices(len(self._index.series_refs_ordered)))],
+ )
+
+ if isinstance(key, list):
+ selected = []
+ for item in key:
+ if not isinstance(item, (int, np.integer)):
+ raise TypeError(f"List index must contain integers, got
{type(item)}")
+ idx = int(item)
+ if idx < 0:
+ idx += len(self._index.series_refs_ordered)
+ if idx < 0 or idx >= len(self._index.series_refs_ordered):
+ raise IndexError(f"Index {item} out of range [0,
{len(self._index.series_refs_ordered)})")
+ selected.append(self._index.series_refs_ordered[idx])
+ return TsFileDataFrame._from_subset(self, selected)
+
+ raise TypeError(f"Unsupported key type: {type(key)}")
+
+ @property
+ def loc(self):
+ return _LocIndexer(self)
+
+ def _collect_tag_columns(self) -> List[str]:
+ seen = {}
+ for table_name, _ in self._index.device_order:
+ for column in self._index.table_entries[table_name].tag_columns:
+ seen.setdefault(column, True)
+ return list(seen.keys())
+
+ def _format_table(self, indices=None, max_rows: int = 20) -> str:
+ series_names = []
+ merged_info = {}
+ for series_ref in self._index.series_refs_ordered:
+ series_name = self._build_series_name(series_ref)
+ series_names.append(series_name)
+ merged_info[series_name] = self._build_series_info(series_ref)
+
+ return format_dataframe_table(
+ series_names,
+ merged_info,
+ self._collect_tag_columns(),
+ indices=indices,
+ max_rows=max_rows,
+ )
+
+ def _repr_header(self) -> str:
+ total = len(self._index.series_refs_ordered)
+ if self._is_view:
+ return f"TsFileDataFrame({total} time series, subset of
{len(self._root._index.series_refs_ordered)})\n"
+ return f"TsFileDataFrame({total} time series, {len(self._paths)}
files)\n"
+
+ def __repr__(self):
+ return self._repr_header() + self._format_table()
+
+ def __str__(self):
+ return self.__repr__()
+
+ def show(self, max_rows: int = 20):
+ print(self._repr_header() + self._format_table(max_rows=max_rows))
+
+ def close(self):
+ if self._is_view:
+ warnings.warn(
+ "close() on a subset TsFileDataFrame is a no-op; only the root
dataframe owns the readers.",
+ RuntimeWarning,
+ stacklevel=2,
+ )
+ return
+ if self._closed:
+ return
+ for reader in self._readers.values():
+ reader.close()
+ self._readers.clear()
+ self._closed = True
+
+ def __del__(self):
+ try:
+ if not getattr(self, "_is_view", False):
+ self.close()
+ except Exception:
+ pass
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, *args):
+ self.close()
diff --git a/python/tsfile/dataset/formatting.py
b/python/tsfile/dataset/formatting.py
new file mode 100644
index 00000000..5e01bb39
--- /dev/null
+++ b/python/tsfile/dataset/formatting.py
@@ -0,0 +1,171 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""String formatting helpers for dataset objects."""
+
+from datetime import datetime
+from typing import Dict, List, Optional
+
+import numpy as np
+
+
+def format_timestamp(ts_ms: int) -> str:
+ """Convert millisecond timestamp to human-readable string."""
+ try:
+ dt = datetime.fromtimestamp(ts_ms / 1000)
+ if ts_ms % 1000 == 0:
+ return dt.strftime("%Y-%m-%d %H:%M:%S")
+ return dt.strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
+ except (OSError, ValueError, TypeError):
+ return str(ts_ms)
+
+
+def format_aligned_timeseries(
+ timestamps: np.ndarray,
+ values: np.ndarray,
+ series_names: List[str],
+ max_rows: Optional[int],
+) -> str:
+ """Render a table-like string for aligned query results.
+
+ Uses head/tail truncation with '...' when rows exceed *max_rows*,
+ consistent with format_dataframe_table.
+ """
+ n_rows, n_cols = values.shape
+ if n_rows == 0:
+ return f"AlignedTimeseries(0 rows, {n_cols} series)"
+
+ # Determine which rows to render (head + tail when truncated)
+ if max_rows is not None and n_rows > max_rows:
+ head = max_rows // 2
+ tail = max_rows - head
+ show_indices = list(range(head)) + list(range(n_rows - tail, n_rows))
+ truncated = True
+ else:
+ show_indices = list(range(n_rows))
+ truncated = False
+
+ # Only format the rows we will actually display
+ ts_strs = {i: format_timestamp(int(timestamps[i])) for i in show_indices}
+ ts_width = max(max((len(s) for s in ts_strs.values()), default=0),
len("time"))
+
+ col_widths = []
+ rendered_values = [] # list of dicts: row_idx -> cell string
+ for col_idx in range(n_cols):
+ col_name = series_names[col_idx] if col_idx < len(series_names) else
f"col_{col_idx}"
+ width = len(col_name)
+ column = {}
+ for row_idx in show_indices:
+ value = values[row_idx, col_idx]
+ cell = "NaN" if np.isnan(value) else f"{value:.2f}"
+ column[row_idx] = cell
+ width = max(width, len(cell))
+ rendered_values.append(column)
+ col_widths.append(width)
+
+ header = ["time".rjust(ts_width)]
+ for col_idx, width in enumerate(col_widths):
+ col_name = series_names[col_idx] if col_idx < len(series_names) else
f"col_{col_idx}"
+ header.append(col_name.rjust(width))
+ lines = [" ".join(header)]
+
+ head_count = max_rows // 2 if truncated else len(show_indices)
+ for i, row_idx in enumerate(show_indices):
+ if truncated and i == head_count:
+ lines.append("...")
+ parts = [ts_strs[row_idx].rjust(ts_width)]
+ for col_idx, width in enumerate(col_widths):
+ parts.append(rendered_values[col_idx][row_idx].rjust(width))
+ lines.append(" ".join(parts))
+
+ return f"AlignedTimeseries({n_rows} rows, {n_cols} series)\n" +
"\n".join(lines)
+
+
+def format_dataframe_table(
+ series_list: List[str],
+ merged_info: Dict[str, dict],
+ tag_columns: List[str],
+ indices: Optional[List[int]] = None,
+ max_rows: int = 20,
+) -> str:
+ """Render the metadata table used by TsFileDataFrame.__repr__."""
+ if indices is None:
+ indices = list(range(len(series_list)))
+ else:
+ indices = list(indices)
+
+ total = len(indices)
+ if total > max_rows:
+ show_indices = list(indices[: max_rows // 2]) + list(indices[-max_rows
// 2 :])
+ truncated = True
+ else:
+ show_indices = indices
+ truncated = False
+
+ rows = []
+ for idx in show_indices:
+ name = series_list[idx]
+ info = merged_info[name]
+ row = {
+ "index": idx,
+ "table": info["table_name"],
+ "field": info["field"],
+ "start_time": format_timestamp(info["min_time"]),
+ "end_time": format_timestamp(info["max_time"]),
+ "count": info["count"],
+ }
+ for tag_col in tag_columns:
+ row[tag_col] = info["tag_values"].get(tag_col, "")
+ rows.append(row)
+
+ if not rows:
+ return "Empty TsFileDataFrame"
+
+ headers = ["", "table"] + tag_columns + ["field", "start_time",
"end_time", "count"]
+ widths = {header: len(header) for header in headers}
+ widths[""] = max(len(str(row["index"])) for row in rows)
+
+ for row in rows:
+ widths[""] = max(widths[""], len(str(row["index"])))
+ widths["table"] = max(widths["table"], len(row["table"]))
+ widths["field"] = max(widths["field"], len(row["field"]))
+ widths["start_time"] = max(widths["start_time"],
len(row["start_time"]))
+ widths["end_time"] = max(widths["end_time"], len(row["end_time"]))
+ widths["count"] = max(widths["count"], len(str(row["count"])))
+ for tag_col in tag_columns:
+ widths[tag_col] = max(widths[tag_col], len(str(row[tag_col])))
+
+ lines = [" ".join(header.rjust(widths[header]) for header in headers)]
+ split = len(rows) // 2 if truncated else len(rows)
+ for row_idx, row in enumerate(rows):
+ if truncated and row_idx == split:
+ lines.append("...")
+ parts = [str(row["index"]).rjust(widths[""]),
row["table"].rjust(widths["table"])]
+ for tag_col in tag_columns:
+ parts.append(str(row[tag_col]).rjust(widths[tag_col]))
+ parts.extend(
+ [
+ row["field"].rjust(widths["field"]),
+ row["start_time"].rjust(widths["start_time"]),
+ row["end_time"].rjust(widths["end_time"]),
+ str(row["count"]).rjust(widths["count"]),
+ ]
+ )
+ lines.append(" ".join(parts))
+
+ return "\n".join(lines)
diff --git a/python/tsfile/dataset/merge.py b/python/tsfile/dataset/merge.py
new file mode 100644
index 00000000..1f72290e
--- /dev/null
+++ b/python/tsfile/dataset/merge.py
@@ -0,0 +1,154 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed with this work under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""Merge helpers for dataset reads.
+
+The dataset package enforces a strict cross-shard merge policy:
+- only numeric-compatible field columns are exposed,
+- null numeric values are represented as ``NaN``,
+- duplicate timestamps for the same logical series across shards are rejected.
+"""
+
+import heapq
+from typing import Dict, List, Tuple
+
+import numpy as np
+
+
+def merge_timestamp_parts(
+ time_parts: List[np.ndarray],
+ *,
+ deduplicate: bool = False,
+ validate_unique: bool = False,
+) -> np.ndarray:
+ """Merge sorted timestamp parts with optional deduplication or
validation."""
+ parts = [ts_part for ts_part in time_parts if len(ts_part) > 0]
+ if not parts:
+ return np.array([], dtype=np.int64)
+ if len(parts) == 1:
+ return parts[0]
+
+ parts.sort(key=lambda ts_part: int(ts_part[0]))
+ if all(int(parts[idx - 1][-1]) < int(parts[idx][0]) for idx in range(1,
len(parts))):
+ return np.concatenate(parts)
+
+ total_length = sum(len(ts_part) for ts_part in parts)
+ merged = np.empty(total_length, dtype=np.int64)
+
+ heap = [(int(ts_part[0]), part_idx, 0) for part_idx, ts_part in
enumerate(parts)]
+ heapq.heapify(heap)
+
+ out_idx = 0
+ last_ts = None
+ while heap:
+ ts, part_idx, offset = heapq.heappop(heap)
+
+ if last_ts is not None and ts == last_ts:
+ if validate_unique:
+ raise ValueError(f"Duplicate timestamp {ts} found across
shards.")
+ if not deduplicate:
+ merged[out_idx] = ts
+ out_idx += 1
+ else:
+ merged[out_idx] = ts
+ out_idx += 1
+ last_ts = ts
+
+ next_offset = offset + 1
+ if next_offset < len(parts[part_idx]):
+ heapq.heappush(heap, (int(parts[part_idx][next_offset]), part_idx,
next_offset))
+
+ if validate_unique:
+ return merged[:out_idx]
+ if deduplicate:
+ return merged[:out_idx]
+ return merged[:out_idx]
+
+
+def merge_time_value_parts(
+ time_parts: List[np.ndarray],
+ value_parts: List[np.ndarray],
+) -> Tuple[np.ndarray, np.ndarray]:
+ """Merge sorted time/value parts for one logical series.
+
+ Duplicate timestamps are validated during metadata loading, so the query
+ path can assume each part is already sorted and conflict-free.
+
+ Fast path: if shard ranges do not overlap in time, concatenate in shard
+ order after sorting parts by their first timestamp.
+ Fallback: use a k-way merge for overlapping-but-disjoint ranges.
+ """
+ parts = [(ts_part, val_part) for ts_part, val_part in zip(time_parts,
value_parts) if len(ts_part) > 0]
+ if not parts:
+ return np.array([], dtype=np.int64), np.array([], dtype=np.float64)
+ if len(parts) == 1:
+ return parts[0]
+
+ parts.sort(key=lambda item: int(item[0][0]))
+ time_parts = [ts_part for ts_part, _ in parts]
+ value_parts = [val_part for _, val_part in parts]
+
+ if all(int(time_parts[idx - 1][-1]) < int(time_parts[idx][0]) for idx in
range(1, len(time_parts))):
+ return np.concatenate(time_parts), np.concatenate(value_parts)
+
+ total_length = sum(len(ts_part) for ts_part in time_parts)
+ merged_ts = np.empty(total_length, dtype=np.int64)
+ merged_vals = np.empty(total_length, dtype=np.float64)
+
+ heap = [(int(ts_part[0]), part_idx, 0) for part_idx, ts_part in
enumerate(time_parts)]
+ heapq.heapify(heap)
+
+ out_idx = 0
+ while heap:
+ _, part_idx, offset = heapq.heappop(heap)
+ merged_ts[out_idx] = time_parts[part_idx][offset]
+ merged_vals[out_idx] = value_parts[part_idx][offset]
+ out_idx += 1
+
+ next_offset = offset + 1
+ if next_offset < len(time_parts[part_idx]):
+ heapq.heappush(heap, (int(time_parts[part_idx][next_offset]),
part_idx, next_offset))
+
+ return merged_ts, merged_vals
+
+
+def build_aligned_matrix(
+ series_names: List[str], series_data: Dict[str, Tuple[np.ndarray,
np.ndarray]]
+) -> Tuple[np.ndarray, np.ndarray]:
+ """Build a timestamp union and aligned value matrix for multiple series.
+
+ Each input series is assumed to already satisfy the dataset merge policy,
+ meaning its timestamp array is unique within that logical series.
+ """
+ all_ts_arrays = [ts for ts, _ in series_data.values() if len(ts) > 0]
+ if not all_ts_arrays:
+ return np.array([], dtype=np.int64), np.empty((0, len(series_names)))
+
+ timestamps = merge_timestamp_parts(all_ts_arrays, deduplicate=True)
+ values = np.full((len(timestamps), len(series_names)), np.nan)
+
+ for col_idx, name in enumerate(series_names):
+ if name not in series_data:
+ continue
+ ts_arr, val_arr = series_data[name]
+ if len(ts_arr) == 0:
+ continue
+ indices = np.searchsorted(timestamps, ts_arr)
+ values[indices, col_idx] = val_arr
+
+ return timestamps, values
diff --git a/python/tsfile/dataset/metadata.py
b/python/tsfile/dataset/metadata.py
new file mode 100644
index 00000000..5c4611e0
--- /dev/null
+++ b/python/tsfile/dataset/metadata.py
@@ -0,0 +1,227 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""Shared metadata models for dataset readers and views."""
+
+from dataclasses import dataclass, field
+from typing import Any, Dict, Iterable, Iterator, List, Tuple
+
+import numpy as np
+
+from ..constants import TSDataType
+
+
+_PATH_SEPARATOR = "."
+_PATH_ESCAPE = "\\"
+
+
+@dataclass(slots=True)
+class TableEntry:
+ """Schema-level metadata shared by every device in one table."""
+
+ table_name: str
+ tag_columns: Tuple[str, ...]
+ tag_types: Tuple[TSDataType, ...]
+ field_columns: Tuple[str, ...]
+ _field_index_by_name: Dict[str, int] = field(init=False, repr=False)
+
+ def __post_init__(self):
+ self._field_index_by_name = {column: idx for idx, column in
enumerate(self.field_columns)}
+
+ def get_field_index(self, field_name: str) -> int:
+ if field_name not in self._field_index_by_name:
+ raise ValueError(f"Field not found in table '{self.table_name}':
{field_name}")
+ return self._field_index_by_name[field_name]
+
+
+@dataclass(slots=True)
+class DeviceEntry:
+ """One logical device identified by table_id + ordered tag values.
+
+ The table_id refers to MetadataCatalog.table_entries[table_id].
+ """
+
+ table_id: int
+ tag_values: Tuple[Any, ...]
+ timestamps: np.ndarray
+ length: int
+ min_time: int
+ max_time: int
+
+
+@dataclass(slots=True)
+class MetadataCatalog:
+ """Canonical metadata store shared by dataset readers and dataframes."""
+
+ table_entries: List[TableEntry] = field(default_factory=list)
+ device_entries: List[DeviceEntry] = field(default_factory=list)
+ table_id_by_name: Dict[str, int] = field(default_factory=dict)
+ device_id_by_key: Dict[Tuple[int, tuple], int] =
field(default_factory=dict)
+
+ def add_table(
+ self,
+ table_name: str,
+ tag_columns: Iterable[str],
+ tag_types: Iterable[TSDataType],
+ field_columns: Iterable[str],
+ ) -> int:
+ table_id = len(self.table_entries)
+ self.table_entries.append(
+ TableEntry(
+ table_name=table_name,
+ tag_columns=tuple(tag_columns),
+ tag_types=tuple(tag_types),
+ field_columns=tuple(field_columns),
+ )
+ )
+ self.table_id_by_name[table_name] = table_id
+ return table_id
+
+ def add_device(self, table_id: int, tag_values: tuple, timestamps:
np.ndarray) -> int:
+ key = (table_id, tuple(tag_values))
+ if key in self.device_id_by_key:
+ return self.device_id_by_key[key]
+
+ timestamps = np.sort(timestamps)
+ if len(timestamps) == 0:
+ raise ValueError("Cannot register a device without timestamps.")
+
+ device_id = len(self.device_entries)
+ self.device_entries.append(
+ DeviceEntry(
+ table_id=table_id,
+ tag_values=tuple(tag_values),
+ timestamps=timestamps,
+ length=len(timestamps),
+ min_time=int(timestamps[0]),
+ max_time=int(timestamps[-1]),
+ )
+ )
+ self.device_id_by_key[key] = device_id
+ return device_id
+
+ @property
+ def series_count(self) -> int:
+ return sum(len(self.table_entries[device.table_id].field_columns) for
device in self.device_entries)
+
+
+def _escape_path_component(value: Any) -> str:
+ return str(value).replace(_PATH_ESCAPE, _PATH_ESCAPE *
2).replace(_PATH_SEPARATOR, _PATH_ESCAPE + _PATH_SEPARATOR)
+
+
+def split_logical_series_path(series_path: str) -> List[str]:
+ parts = []
+ current = []
+ escaping = False
+
+ for char in series_path:
+ if escaping:
+ current.append(char)
+ escaping = False
+ continue
+ if char == _PATH_ESCAPE:
+ escaping = True
+ continue
+ if char == _PATH_SEPARATOR:
+ parts.append("".join(current))
+ current = []
+ continue
+ current.append(char)
+
+ if escaping:
+ raise ValueError(f"Invalid series path: {series_path}")
+
+ parts.append("".join(current))
+ return parts
+
+
+def build_logical_series_path(table_name: str, tag_values: Iterable[Any],
field_name: str) -> str:
+ components = [table_name, *tag_values, field_name]
+ return _PATH_SEPARATOR.join(_escape_path_component(component) for
component in components)
+
+
+def build_series_path(catalog: MetadataCatalog, device_id: int, field_idx:
int) -> str:
+ """Return the external logical series name for one device field."""
+ device_entry = catalog.device_entries[device_id]
+ table_entry = catalog.table_entries[device_entry.table_id]
+ field_name = table_entry.field_columns[field_idx]
+ return build_logical_series_path(table_entry.table_name,
device_entry.tag_values, field_name)
+
+
+def iter_series_refs(catalog: MetadataCatalog) -> Iterator[Tuple[int, int]]:
+ """Yield ``(device_id, field_idx)`` pairs in catalog order."""
+ for device_id, device_entry in enumerate(catalog.device_entries):
+ table_entry = catalog.table_entries[device_entry.table_id]
+ for field_idx in range(len(table_entry.field_columns)):
+ yield device_id, field_idx
+
+
+def iter_series_paths(catalog: MetadataCatalog) -> Iterator[str]:
+ """Yield logical series names in catalog order."""
+ for device_id, field_idx in iter_series_refs(catalog):
+ yield build_series_path(catalog, device_id, field_idx)
+
+
+def resolve_series_path(catalog: MetadataCatalog, series_path: str) ->
Tuple[int, int, int]:
+ """Resolve an external path to ``(table_id, device_id, field_idx)``."""
+ parts = split_logical_series_path(series_path)
+ if len(parts) < 2:
+ raise ValueError(f"Invalid series path: {series_path}")
+
+ table_name = parts[0]
+ if table_name not in catalog.table_id_by_name:
+ raise ValueError(f"Series not found: {series_path}")
+
+ table_id = catalog.table_id_by_name[table_name]
+ table_entry = catalog.table_entries[table_id]
+ expected_parts = len(table_entry.tag_columns) + 2
+ if len(parts) != expected_parts:
+ raise ValueError(f"Series not found: {series_path}")
+
+ field_name = parts[-1]
+ try:
+ field_idx = table_entry.get_field_index(field_name)
+ except ValueError as exc:
+ raise ValueError(f"Series not found: {series_path}") from exc
+
+ tag_values = tuple(
+ _coerce_path_component(raw_value, tag_type)
+ for raw_value, tag_type in zip(parts[1:-1], table_entry.tag_types)
+ )
+ key = (table_id, tag_values)
+ if key not in catalog.device_id_by_key:
+ raise ValueError(f"Series not found: {series_path}")
+
+ return table_id, catalog.device_id_by_key[key], field_idx
+
+
+def _coerce_path_component(value: str, data_type: TSDataType) -> Any:
+ if data_type in {TSDataType.STRING, TSDataType.TEXT, TSDataType.BLOB}:
+ return value
+ if data_type == TSDataType.BOOLEAN:
+ lowered = value.lower()
+ if lowered == "true":
+ return True
+ if lowered == "false":
+ return False
+ raise ValueError(f"Invalid boolean tag value: {value}")
+ if data_type in {TSDataType.INT32, TSDataType.INT64, TSDataType.TIMESTAMP,
TSDataType.DATE}:
+ return int(value)
+ if data_type in {TSDataType.FLOAT, TSDataType.DOUBLE}:
+ return float(value)
+ return value
diff --git a/python/tsfile/dataset/reader.py b/python/tsfile/dataset/reader.py
new file mode 100644
index 00000000..365dc884
--- /dev/null
+++ b/python/tsfile/dataset/reader.py
@@ -0,0 +1,354 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""Single-file reader backend used by TsFileDataFrame."""
+
+import os
+import sys
+from typing import Dict, Iterator, List, Tuple
+
+import numpy as np
+import pyarrow.compute as pc
+
+from ..constants import ColumnCategory, TSDataType
+from ..tsfile_reader import TsFileReaderPy
+from .metadata import MetadataCatalog, build_series_path, iter_series_refs,
resolve_series_path
+
+
+_NUMERIC_FIELD_TYPES = {
+ TSDataType.BOOLEAN,
+ TSDataType.INT32,
+ TSDataType.INT64,
+ TSDataType.FLOAT,
+ TSDataType.DOUBLE,
+ TSDataType.TIMESTAMP,
+}
+
+
+def _to_python_scalar(value):
+ return value.item() if hasattr(value, "item") else value
+
+
+class TsFileSeriesReader:
+ """Wrap ``TsFileReaderPy`` with numeric dataset discovery and batch
reads."""
+
+ def __init__(self, file_path: str, show_progress: bool = True):
+ if not os.path.exists(file_path):
+ raise FileNotFoundError(f"TsFile not found: {file_path}")
+
+ self.file_path = file_path
+ self.show_progress = show_progress
+
+ try:
+ self._reader = TsFileReaderPy(file_path)
+ except Exception as e:
+ raise ValueError(f"Failed to open TsFile: {e}") from e
+
+ self._catalog = MetadataCatalog()
+ self._cache_metadata()
+
+ def __del__(self):
+ self.close()
+
+ @property
+ def catalog(self) -> MetadataCatalog:
+ return self._catalog
+
+ @property
+ def series_paths(self) -> List[str]:
+ return list(self.iter_series_paths())
+
+ @property
+ def series_count(self) -> int:
+ return self._catalog.series_count
+
+ def iter_series_paths(self) -> Iterator[str]:
+ for device_id, field_idx in iter_series_refs(self._catalog):
+ yield build_series_path(self._catalog, device_id, field_idx)
+
+ def iter_series_refs(self) -> Iterator[Tuple[str, int, int]]:
+ for device_id, field_idx in iter_series_refs(self._catalog):
+ yield build_series_path(self._catalog, device_id, field_idx),
device_id, field_idx
+
+ def close(self):
+ if hasattr(self, "_reader"):
+ try:
+ self._reader.close()
+ except Exception:
+ pass
+
+ def _cache_metadata(self):
+ """Wrap metadata discovery so reader construction surfaces one stable
error shape."""
+ try:
+ self._cache_metadata_table_model()
+ # todo: we should support tree model
+ except Exception as e:
+ raise ValueError(
+ f"Failed to read TsFile metadata. Please ensure the TsFile is
valid and readable. Error: {e}"
+ ) from e
+
+ def _cache_metadata_table_model(self):
+ """Build the in-memory catalog by scanning table batches from the
file."""
+ table_schemas = self._reader.get_all_table_schemas()
+ if not table_schemas:
+ raise ValueError("No tables found in TsFile")
+
+ self._catalog = MetadataCatalog()
+ total_rows = 0
+ table_names = list(table_schemas.keys())
+
+ for table_index, table_name in enumerate(table_names):
+ table_schema = table_schemas[table_name]
+
+ tag_columns = []
+ tag_types = []
+ field_columns = []
+ for column_schema in table_schema.get_columns():
+ column_name = column_schema.get_column_name()
+ column_category = column_schema.get_category()
+ if column_category == ColumnCategory.TIME:
+ continue
+ if column_category == ColumnCategory.TAG:
+ tag_columns.append(column_name)
+ tag_types.append(column_schema.get_data_type())
+
+ # ignore fields which is not numeric, we won't use them
currently.
+ elif (
+ column_category == ColumnCategory.FIELD
+ and column_schema.get_data_type() in _NUMERIC_FIELD_TYPES
+ ):
+ field_columns.append(column_name)
+
+ if not field_columns:
+ continue
+
+ table_id = self._catalog.add_table(table_name, tag_columns,
tag_types, field_columns)
+ time_arrays = []
+ tag_arrays = {tag_column: [] for tag_column in tag_columns}
+
+ # [Temporary] It will be replaced by new tsfile api, we won't
query all the data later.
+ query_columns = tag_columns + field_columns
+
+ with self._reader.query_table_batch(table_name, query_columns,
batch_size=65536) as result_set:
+ while True:
+ arrow_table = result_set.read_arrow_batch()
+ if arrow_table is None:
+ break
+ batch_rows = arrow_table.num_rows
+ total_rows += batch_rows
+ time_arrays.append(arrow_table.column("time").to_numpy())
+ for tag_column in tag_columns:
+
tag_arrays[tag_column].append(arrow_table.column(tag_column).to_numpy())
+
+ if self.show_progress:
+ sys.stderr.write(
+ f"\rReading TsFile metadata: table {table_index +
1}/{len(table_names)} "
+ f"[{table_name}] ({total_rows:,} rows)"
+ )
+ sys.stderr.flush()
+
+ if not time_arrays:
+ continue
+
+ timestamps = np.concatenate(time_arrays).astype(np.int64)
+ if not tag_columns:
+ self._add_device(table_id, (), timestamps)
+ continue
+
+ for tag_values, device_timestamps in
self._iter_device_groups(tag_columns, timestamps, tag_arrays):
+ self._add_device(table_id, tag_values, device_timestamps)
+
+ if self.show_progress and total_rows > 0:
+ sys.stderr.write(
+ f"\rReading TsFile metadata: {len(table_names)} table(s),
{total_rows:,} rows, "
+ f"{self.series_count} series ... done\n"
+ )
+ sys.stderr.flush()
+
+ if self.series_count == 0:
+ raise ValueError("No valid numeric series found in TsFile")
+
+ def _iter_device_groups(
+ self,
+ tag_columns: List[str],
+ timestamps: np.ndarray,
+ tag_arrays: Dict[str, list],
+ ) -> Iterator[Tuple[tuple, np.ndarray]]:
+ """Group one table's rows by tag tuple while preserving original row
membership."""
+ tag_values_by_column = {column: np.concatenate(tag_arrays[column]) for
column in tag_columns}
+
+ n = len(timestamps)
+ arrays = [tag_values_by_column[col] for col in tag_columns]
+ dtype = np.dtype([(col, arrays[i].dtype) for i, col in
enumerate(tag_columns)])
+ composite = np.empty(n, dtype=dtype)
+ for i, col in enumerate(tag_columns):
+ composite[col] = arrays[i]
+
+ _, inverse, counts = np.unique(composite, return_inverse=True,
return_counts=True)
+ ordered_indices = np.argsort(inverse, kind="stable")
+ group_bounds = np.cumsum(counts)[:-1]
+ for group_indices in np.split(ordered_indices, group_bounds):
+ first = int(group_indices[0])
+ tag_tuple = tuple(_to_python_scalar(composite[col][first]) for col
in tag_columns)
+ yield tag_tuple, timestamps[group_indices]
+
+ def _add_device(
+ self,
+ table_id: int,
+ tag_values: tuple,
+ timestamps: np.ndarray,
+ ):
+ """Add one device to the catalog."""
+ if len(timestamps) == 0:
+ return
+
+ self._catalog.add_device(table_id, tag_values, timestamps)
+
+ def _resolve_series_path(self, series_path: str) -> Tuple[int, int, int]:
+ return resolve_series_path(self._catalog, series_path)
+
+ def _resolve_series_ref(self, device_id: int, field_idx: int):
+ """Resolve a reader-local ref into the table/device metadata needed by
read paths."""
+ device_entry = self._catalog.device_entries[device_id]
+ table_entry = self._catalog.table_entries[device_entry.table_id]
+ field_name = table_entry.field_columns[field_idx]
+ return table_entry, device_entry, field_name
+
+ def get_device_info(self, device_id: int) -> dict:
+ device_entry = self._catalog.device_entries[device_id]
+ table_entry = self._catalog.table_entries[device_entry.table_id]
+ return {
+ "table_name": table_entry.table_name,
+ "tag_columns": table_entry.tag_columns,
+ "tag_values": dict(zip(table_entry.tag_columns,
device_entry.tag_values)),
+ "length": device_entry.length,
+ "min_time": device_entry.min_time,
+ "max_time": device_entry.max_time,
+ }
+
+ def get_device_timestamps(self, device_id: int) -> np.ndarray:
+ return self._catalog.device_entries[device_id].timestamps
+
+ def get_series_info_by_ref(self, device_id: int, field_idx: int) -> dict:
+ table_entry, device_entry, field_name =
self._resolve_series_ref(device_id, field_idx)
+ return {
+ "length": device_entry.length,
+ "min_time": device_entry.min_time,
+ "max_time": device_entry.max_time,
+ "table_name": table_entry.table_name,
+ "column_name": field_name,
+ "device_id": device_id,
+ "field_idx": field_idx,
+ "tag_columns": table_entry.tag_columns,
+ "tag_values": dict(zip(table_entry.tag_columns,
device_entry.tag_values)),
+ }
+
+ def get_series_info(self, series_path: str) -> dict:
+ device_id, field_idx = self._resolve_series_path(series_path)[1:]
+ return self.get_series_info_by_ref(device_id, field_idx)
+
+ def get_series_timestamps(self, series_path: str) -> np.ndarray:
+ device_id = self._resolve_series_path(series_path)[1]
+ return self.get_device_timestamps(device_id)
+
+ def read_series_by_ref(self, device_id: int, field_idx: int, start_time:
int, end_time: int) -> Tuple[np.ndarray, np.ndarray]:
+ table_entry, _, field_name = self._resolve_series_ref(device_id,
field_idx)
+ timestamps, field_values =
self.read_device_fields_by_time_range(device_id, [field_idx], start_time,
end_time)
+ if len(timestamps) == 0:
+ return np.array([], dtype=np.int64), np.array([], dtype=np.float64)
+ return timestamps, field_values[field_name]
+
+ def read_series_by_time_range(self, series_path: str, start_time: int,
end_time: int) -> Tuple[np.ndarray, np.ndarray]:
+ _, device_id, field_idx = self._resolve_series_path(series_path)
+ return self.read_series_by_ref(device_id, field_idx, start_time,
end_time)
+
+ def read_device_fields_by_time_range(
+ self, device_id: int, field_indices: List[int], start_time: int,
end_time: int
+ ) -> Tuple[np.ndarray, Dict[str, np.ndarray]]:
+ """Read one device slice and return the requested field columns keyed
by field name."""
+ device_entry = self._catalog.device_entries[device_id]
+ table_entry = self._catalog.table_entries[device_entry.table_id]
+ requested_field_columns = [table_entry.field_columns[field_idx] for
field_idx in field_indices]
+ timestamps, field_values = self._read_arrow(
+ table_entry.table_name,
+ requested_field_columns,
+ table_entry.tag_columns,
+ dict(zip(table_entry.tag_columns, device_entry.tag_values)),
+ start_time,
+ end_time,
+ )
+ return timestamps, field_values
+
+ def _read_arrow(
+ self,
+ table_name: str,
+ field_columns: List[str],
+ tag_columns: Tuple[str, ...],
+ tag_values: Dict[str, object],
+ start_time: int,
+ end_time: int,
+ ) -> Tuple[np.ndarray, Dict[str, np.ndarray]]:
+ """Execute the underlying table query, then apply tag filtering
client-side."""
+ tag_columns = list(tag_columns)
+ field_columns = list(field_columns)
+ query_columns = tag_columns + field_columns if tag_columns else
list(field_columns)
+ timestamp_parts = []
+ field_parts = {field_column: [] for field_column in field_columns}
+
+ with self._reader.query_table_batch(
+ table_name,
+ query_columns,
+ start_time=start_time,
+ end_time=end_time,
+ batch_size=65536,
+ ) as result_set:
+ while True:
+ arrow_table = result_set.read_arrow_batch()
+ if arrow_table is None:
+ break
+
+ if tag_values:
+ mask = None
+ for tag_column, tag_value in tag_values.items():
+ column_mask = pc.equal(arrow_table.column(tag_column),
tag_value)
+ mask = column_mask if mask is None else pc.and_(mask,
column_mask)
+ arrow_table = arrow_table.filter(mask)
+
+ if arrow_table.num_rows == 0:
+ continue
+
+ timestamp_parts.append(arrow_table.column("time").to_numpy())
+ for field_column in field_columns:
+ raw_values = arrow_table.column(field_column).to_numpy()
+ try:
+
field_parts[field_column].append(np.asarray(raw_values, dtype=np.float64))
+ except (TypeError, ValueError) as e:
+ raise TypeError(
+ f"Field column '{field_column}' in table
'{table_name}' is not numeric-compatible."
+ ) from e
+
+ if not timestamp_parts:
+ return (
+ np.array([], dtype=np.int64),
+ {field_column: np.array([], dtype=np.float64) for field_column
in field_columns},
+ )
+
+ return (
+ np.concatenate(timestamp_parts).astype(np.int64),
+ {field_column: np.concatenate(field_parts[field_column]) for
field_column in field_columns},
+ )
diff --git a/python/tsfile/dataset/timeseries.py
b/python/tsfile/dataset/timeseries.py
new file mode 100644
index 00000000..35f1614d
--- /dev/null
+++ b/python/tsfile/dataset/timeseries.py
@@ -0,0 +1,155 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""Timeseries handles returned by the dataset package."""
+
+from typing import Callable, List, Optional, Tuple
+
+import numpy as np
+
+from .merge import merge_time_value_parts
+from .formatting import format_aligned_timeseries, format_timestamp
+
+
+class AlignedTimeseries:
+ """Time-aligned multi-series query result with timestamps.
+
+ Returned by ``TsFileDataFrame.loc[...]``. The values matrix is aligned on
+ the union of timestamps from the selected logical series.
+ """
+
+ def __init__(self, timestamps: np.ndarray, values: np.ndarray,
series_names: List[str]):
+ self.timestamps = timestamps
+ self.values = values
+ self.series_names = series_names
+
+ @property
+ def shape(self):
+ return self.values.shape
+
+ def __len__(self):
+ return len(self.timestamps)
+
+ def __getitem__(self, key):
+ return self.values[key]
+
+ def __repr__(self):
+ return format_aligned_timeseries(self.timestamps, self.values,
self.series_names, max_rows=20)
+
+ def show(self, max_rows: Optional[int] = None):
+ print(format_aligned_timeseries(self.timestamps, self.values,
self.series_names, max_rows=max_rows))
+
+
+class Timeseries:
+ """Single logical numeric series with transparent cross-file merging.
+
+ Cross-shard reads follow the dataset merge policy defined in
+ :mod:`tsfile.dataset.merge`: duplicate timestamps across shards are treated
+ as an error rather than being merged implicitly.
+ """
+
+ def __init__(
+ self,
+ name: str,
+ series_refs: list,
+ stats: dict,
+ ensure_open: Callable[[], None],
+ load_timestamps: Callable[[], np.ndarray],
+ ):
+ self._name = name
+ self._series_refs = series_refs
+ self._stats = dict(stats)
+ self._ensure_open = ensure_open
+ self._load_timestamps = load_timestamps
+ self._timestamps = None
+
+ @property
+ def name(self) -> str:
+ return self._name
+
+ @property
+ def timestamps(self) -> np.ndarray:
+ self._ensure_open()
+ if self._timestamps is None:
+ self._timestamps = self._load_timestamps()
+ return self._timestamps
+
+ @property
+ def stats(self) -> dict:
+ return {
+ "start_time": self._stats.get("min_time"),
+ "end_time": self._stats.get("max_time"),
+ "count": self._stats["count"],
+ }
+
+ def __len__(self) -> int:
+ return self._stats["count"]
+
+ def __getitem__(self, key):
+ timestamps = self.timestamps
+ length = len(timestamps)
+
+ if isinstance(key, int):
+ if key < 0:
+ key += length
+ if key < 0 or key >= length:
+ raise IndexError(f"Index {key} out of range [0, {length})")
+ ts = int(timestamps[key])
+ _, values = self._query_time_range(ts, ts)
+ return float(values[0]) if len(values) > 0 else None
+
+ if isinstance(key, slice):
+ requested_ts = timestamps[key]
+ if len(requested_ts) == 0:
+ return np.array([], dtype=np.float64)
+
+ ts_arr, values = self._query_time_range(int(np.min(requested_ts)),
int(np.max(requested_ts)))
+ result = np.full(len(requested_ts), np.nan)
+ if len(ts_arr) > 0:
+ indices = np.searchsorted(ts_arr, requested_ts)
+ valid = (indices < len(ts_arr)) & (
+ ts_arr[np.minimum(indices, len(ts_arr) - 1)] ==
requested_ts
+ )
+ result[valid] = values[indices[valid]]
+ return result
+
+ raise TypeError(f"Unsupported key type: {type(key)}")
+
+ def _query_time_range(self, start_time: int, end_time: int) ->
Tuple[np.ndarray, np.ndarray]:
+ self._ensure_open()
+ time_parts = []
+ value_parts = []
+ for reader, device_id, field_idx in self._series_refs:
+ device_timestamps = reader.get_device_timestamps(device_id)
+ if device_timestamps[-1] < start_time or device_timestamps[0] >
end_time:
+ continue
+ ts_arr, val_arr = reader.read_series_by_ref(device_id, field_idx,
start_time, end_time)
+ if len(ts_arr) > 0:
+ time_parts.append(ts_arr)
+ value_parts.append(val_arr)
+ return merge_time_value_parts(time_parts, value_parts)
+
+ def __repr__(self):
+ stats = self.stats
+ if stats["count"] == 0:
+ return f"Timeseries('{self._name}', count=0)"
+ return (
+ f"Timeseries('{self._name}', count={stats['count']}, "
+ f"start={format_timestamp(stats['start_time'])}, "
+ f"end={format_timestamp(stats['end_time'])})"
+ )