This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/tsfile.git


The following commit(s) were added to refs/heads/develop by this push:
     new 21d70d65 Support TsfileDataFrame (#765)
21d70d65 is described below

commit 21d70d65764386548ed6fd9fd4c428346db8003e
Author: YangCaiyin <[email protected]>
AuthorDate: Wed Apr 8 12:04:54 2026 +0800

    Support TsfileDataFrame (#765)
    
    * support tsfileDataFrame
    
    * make tsfileDataFrame a module
    
    * support metadata management and optmize implementation
    
    * support show() method in tsdf
    
    * fix typo and address close problem
---
 python/setup.py                     |   2 +-
 python/tests/test_tsfile_dataset.py | 397 ++++++++++++++++++++++
 python/tsfile/__init__.py           |   3 +-
 python/tsfile/dataset/__init__.py   |  24 ++
 python/tsfile/dataset/dataframe.py  | 635 ++++++++++++++++++++++++++++++++++++
 python/tsfile/dataset/formatting.py | 171 ++++++++++
 python/tsfile/dataset/merge.py      | 154 +++++++++
 python/tsfile/dataset/metadata.py   | 227 +++++++++++++
 python/tsfile/dataset/reader.py     | 354 ++++++++++++++++++++
 python/tsfile/dataset/timeseries.py | 155 +++++++++
 10 files changed, 2120 insertions(+), 2 deletions(-)

diff --git a/python/setup.py b/python/setup.py
index 759abab3..16fc7aa8 100644
--- a/python/setup.py
+++ b/python/setup.py
@@ -144,7 +144,7 @@ exts = [
 setup(
     name="tsfile",
     version=version,
-    packages=["tsfile"],
+    packages=["tsfile", "tsfile.dataset"],
     package_dir={"": "."},
     include_package_data=True,
     ext_modules=cythonize(exts, compiler_directives={"language_level": 3}),
diff --git a/python/tests/test_tsfile_dataset.py 
b/python/tests/test_tsfile_dataset.py
new file mode 100644
index 00000000..63cd439a
--- /dev/null
+++ b/python/tests/test_tsfile_dataset.py
@@ -0,0 +1,397 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import numpy as np
+import pandas as pd
+import pytest
+
+from tsfile import ColumnCategory, ColumnSchema, TSDataType, TableSchema, 
TsFileTableWriter
+from tsfile import AlignedTimeseries, Timeseries, TsFileDataFrame
+from tsfile.dataset.formatting import format_timestamp
+from tsfile.dataset.reader import TsFileSeriesReader
+
+
+def _write_weather_file(path, start):
+    schema = TableSchema(
+        "weather",
+        [
+            ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG),
+            ColumnSchema("temperature", TSDataType.DOUBLE, 
ColumnCategory.FIELD),
+            ColumnSchema("humidity", TSDataType.DOUBLE, ColumnCategory.FIELD),
+        ],
+    )
+    df = pd.DataFrame(
+        {
+            "time": [start, start + 1, start + 2],
+            "device": ["device_a", "device_a", "device_a"],
+            "temperature": [20.0, 21.5, 23.0],
+            "humidity": [50.0, 52.0, 55.0],
+        }
+    )
+    with TsFileTableWriter(str(path), schema) as writer:
+        writer.write_dataframe(df)
+
+
+def _write_numeric_and_text_file(path):
+    schema = TableSchema(
+        "weather",
+        [
+            ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG),
+            ColumnSchema("temperature", TSDataType.DOUBLE, 
ColumnCategory.FIELD),
+            ColumnSchema("status", TSDataType.STRING, ColumnCategory.FIELD),
+        ],
+    )
+    df = pd.DataFrame(
+        {
+            "time": [0, 1, 2],
+            "device": ["device_a", "device_a", "device_a"],
+            "temperature": [20.0, np.nan, 23.5],
+            "status": ["ok", "warn", "ok"],
+        }
+    )
+    with TsFileTableWriter(str(path), schema) as writer:
+        writer.write_dataframe(df)
+
+
+def _write_partial_numeric_rows_file(path):
+    schema = TableSchema(
+        "weather",
+        [
+            ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG),
+            ColumnSchema("temperature", TSDataType.DOUBLE, 
ColumnCategory.FIELD),
+            ColumnSchema("humidity", TSDataType.DOUBLE, ColumnCategory.FIELD),
+        ],
+    )
+    df = pd.DataFrame(
+        {
+            "time": [0, 1],
+            "device": ["device_a", "device_a"],
+            "temperature": [np.nan, 21.0],
+            "humidity": [50.0, 51.0],
+        }
+    )
+    with TsFileTableWriter(str(path), schema) as writer:
+        writer.write_dataframe(df)
+
+
+def _write_weather_with_extra_field_file(path, start):
+    schema = TableSchema(
+        "weather",
+        [
+            ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG),
+            ColumnSchema("temperature", TSDataType.DOUBLE, 
ColumnCategory.FIELD),
+            ColumnSchema("humidity", TSDataType.DOUBLE, ColumnCategory.FIELD),
+            ColumnSchema("pressure", TSDataType.DOUBLE, ColumnCategory.FIELD),
+        ],
+    )
+    df = pd.DataFrame(
+        {
+            "time": [start, start + 1],
+            "device": ["device_a", "device_a"],
+            "temperature": [20.0, 21.0],
+            "humidity": [50.0, 51.0],
+            "pressure": [1000.0, 1001.0],
+        }
+    )
+    with TsFileTableWriter(str(path), schema) as writer:
+        writer.write_dataframe(df)
+
+
+def _write_multi_tag_file(path):
+    schema = TableSchema(
+        "weather",
+        [
+            ColumnSchema("city", TSDataType.STRING, ColumnCategory.TAG),
+            ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG),
+            ColumnSchema("temperature", TSDataType.DOUBLE, 
ColumnCategory.FIELD),
+            ColumnSchema("humidity", TSDataType.DOUBLE, ColumnCategory.FIELD),
+            ColumnSchema("status", TSDataType.STRING, ColumnCategory.FIELD),
+        ],
+    )
+    df = pd.DataFrame(
+        {
+            "time": [0, 1, 0, 1],
+            "city": ["beijing", "beijing", "shanghai", "shanghai"],
+            "device": ["device_a", "device_a", "device_b", "device_b"],
+            "temperature": [20.0, 21.0, 24.0, 25.0],
+            "humidity": [50.0, 51.0, 60.0, 61.0],
+            "status": ["ok", "ok", "warn", "warn"],
+        }
+    )
+    with TsFileTableWriter(str(path), schema) as writer:
+        writer.write_dataframe(df)
+
+
+def _write_special_tag_file(path):
+    schema = TableSchema(
+        "weather",
+        [
+            ColumnSchema("city", TSDataType.STRING, ColumnCategory.TAG),
+            ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG),
+            ColumnSchema("temperature", TSDataType.DOUBLE, 
ColumnCategory.FIELD),
+        ],
+    )
+    df = pd.DataFrame(
+        {
+            "time": [0, 1],
+            "city": ["bei.jing", "bei.jing"],
+            "device": [r"dev\1", r"dev\1"],
+            "temperature": [20.0, 21.0],
+        }
+    )
+    with TsFileTableWriter(str(path), schema) as writer:
+        writer.write_dataframe(df)
+
+
+def test_dataset_top_level_imports():
+    assert TsFileDataFrame.__module__ == "tsfile.dataset.dataframe"
+    assert Timeseries.__module__ == "tsfile.dataset.timeseries"
+    assert AlignedTimeseries.__module__ == "tsfile.dataset.timeseries"
+
+
+def test_format_timestamp_preserves_millisecond_precision():
+    assert "." not in format_timestamp(1000)
+    assert format_timestamp(1).endswith(".001")
+
+
+def test_dataset_basic_access_patterns(tmp_path, capsys):
+    path1 = tmp_path / "part1.tsfile"
+    path2 = tmp_path / "part2.tsfile"
+    _write_weather_file(path1, 0)
+    _write_weather_file(path2, 3)
+
+    with TsFileDataFrame([str(path1), str(path2)], show_progress=False) as 
tsdf:
+        assert len(tsdf) == 2
+
+        first = tsdf[0]
+        assert isinstance(first, Timeseries)
+        assert first.name in tsdf.list_timeseries()
+        assert len(first) == 6
+        assert first[0] == 20.0
+        assert first[-1] == 23.0
+        assert "Timeseries(" in repr(first)
+
+        by_name = tsdf[first.name]
+        assert isinstance(by_name, Timeseries)
+        assert by_name.name == first.name
+
+        subset = tsdf[:1]
+        assert isinstance(subset, TsFileDataFrame)
+        assert len(subset) == 1
+
+        selected = tsdf[[0, 1]]
+        assert isinstance(selected, TsFileDataFrame)
+        assert len(selected) == 2
+
+        aligned = tsdf.loc[0:5, [0, 1]]
+        assert isinstance(aligned, AlignedTimeseries)
+        assert aligned.shape == (6, 2)
+
+        aligned_negative = tsdf.loc[0:5, [-1]]
+        assert isinstance(aligned_negative, AlignedTimeseries)
+        assert aligned_negative.shape == (6, 1)
+
+        assert list(tsdf["field"]) == ["temperature", "humidity"]
+
+        assert "TsFileDataFrame(2 time series, 2 files)" in repr(tsdf)
+        aligned.show(2)
+        assert "AlignedTimeseries(6 rows, 2 series)" in capsys.readouterr().out
+
+
+def test_dataset_exposes_only_numeric_fields_and_keeps_nan(tmp_path):
+    path = tmp_path / "numeric_and_text.tsfile"
+    _write_numeric_and_text_file(path)
+
+    with TsFileDataFrame(str(path), show_progress=False) as tsdf:
+        assert tsdf.list_timeseries() == ["weather.device_a.temperature"]
+
+        series = tsdf[0]
+        assert series.name == "weather.device_a.temperature"
+        assert np.isnan(series[1])
+        sliced = series[:3]
+        assert sliced.shape == (3,)
+        assert np.isnan(sliced[1])
+        assert series[1:1].shape == (0,)
+
+
+def test_dataset_timeseries_supports_negative_step_slices(tmp_path):
+    path = tmp_path / "weather.tsfile"
+    _write_weather_file(path, 0)
+
+    with TsFileDataFrame(str(path), show_progress=False) as tsdf:
+        series = tsdf[0]
+        np.testing.assert_array_equal(series[::-1], np.array([23.0, 21.5, 
20.0]))
+        np.testing.assert_array_equal(series[::-2], np.array([23.0, 20.0]))
+
+
+def test_dataset_metadata_discovery_uses_all_numeric_fields(tmp_path):
+    path = tmp_path / "partial_numeric_rows.tsfile"
+    _write_partial_numeric_rows_file(path)
+
+    with TsFileDataFrame(str(path), show_progress=False) as tsdf:
+        assert tsdf.list_timeseries() == [
+            "weather.device_a.temperature",
+            "weather.device_a.humidity",
+        ]
+
+        assert list(tsdf["count"]) == [2, 2]
+        assert list(tsdf["start_time"]) == [0, 0]
+        assert list(tsdf["end_time"]) == [1, 1]
+
+
+def test_dataset_rejects_duplicate_timestamps_across_shards(tmp_path):
+    path1 = tmp_path / "part1.tsfile"
+    path2 = tmp_path / "part2.tsfile"
+    _write_weather_file(path1, 0)
+    _write_weather_file(path2, 2)
+
+    with pytest.raises(ValueError, match="Duplicate timestamp"):
+        TsFileDataFrame([str(path1), str(path2)], show_progress=False)
+
+
+def test_dataset_rejects_data_access_after_close(tmp_path):
+    path = tmp_path / "weather.tsfile"
+    _write_weather_file(path, 0)
+
+    tsdf = TsFileDataFrame(str(path), show_progress=False)
+    series = tsdf[0]
+    tsdf.close()
+
+    with pytest.raises(RuntimeError, match="TsFileDataFrame is closed"):
+        _ = tsdf[0]
+
+    with pytest.raises(RuntimeError, match="TsFileDataFrame is closed"):
+        _ = series[0]
+
+
+def test_subset_close_warns_and_does_not_close_root(tmp_path):
+    path = tmp_path / "weather.tsfile"
+    _write_weather_file(path, 0)
+
+    with TsFileDataFrame(str(path), show_progress=False) as tsdf:
+        subset = tsdf[:1]
+        with pytest.warns(RuntimeWarning, match="no-op"):
+            subset.close()
+
+        series = tsdf[0]
+        assert series[0] == 20.0
+
+
+def test_dataset_rejects_incompatible_table_schemas_across_shards(tmp_path):
+    path1 = tmp_path / "part1.tsfile"
+    path2 = tmp_path / "part2.tsfile"
+    _write_weather_file(path1, 0)
+    _write_weather_with_extra_field_file(path2, 2)
+
+    with pytest.raises(ValueError, match="Incompatible schema for table 
'weather'"):
+        TsFileDataFrame([str(path1), str(path2)], show_progress=False)
+
+
+def test_dataset_multi_tag_metadata_discovery(tmp_path):
+    path = tmp_path / "multi_tag.tsfile"
+    _write_multi_tag_file(path)
+
+    with TsFileDataFrame(str(path), show_progress=False) as tsdf:
+        assert tsdf.list_timeseries() == [
+            "weather.beijing.device_a.temperature",
+            "weather.beijing.device_a.humidity",
+            "weather.shanghai.device_b.temperature",
+            "weather.shanghai.device_b.humidity",
+        ]
+
+        summary = pd.DataFrame(
+            {
+                "series_path": tsdf.list_timeseries(),
+                "table": tsdf["table"],
+                "city": tsdf["city"],
+                "device": tsdf["device"],
+                "field": tsdf["field"],
+                "start_time": tsdf["start_time"],
+                "end_time": tsdf["end_time"],
+                "count": tsdf["count"],
+            }
+        ).sort_values(["city", "device", "field"]).reset_index(drop=True)
+        assert list(summary.columns) == [
+            "series_path",
+            "table",
+            "city",
+            "device",
+            "field",
+            "start_time",
+            "end_time",
+            "count",
+        ]
+        assert list(summary["city"]) == ["beijing", "beijing", "shanghai", 
"shanghai"]
+        assert list(summary["device"]) == ["device_a", "device_a", "device_b", 
"device_b"]
+        assert list(summary["field"]) == ["humidity", "temperature", 
"humidity", "temperature"]
+        assert list(summary["count"]) == [2, 2, 2, 2]
+
+
+def test_dataset_series_paths_escape_special_tag_values(tmp_path):
+    path = tmp_path / "special_tag.tsfile"
+    _write_special_tag_file(path)
+
+    with TsFileDataFrame(str(path), show_progress=False) as tsdf:
+        expected_path = r"weather.bei\.jing.dev\\1.temperature"
+        assert tsdf.list_timeseries() == [expected_path]
+
+        series = tsdf[expected_path]
+        assert isinstance(series, Timeseries)
+        assert series.name == expected_path
+        assert list(tsdf["city"]) == ["bei.jing"]
+        assert list(tsdf["device"]) == [r"dev\1"]
+
+
+def test_reader_series_paths_escape_special_tag_values(tmp_path):
+    path = tmp_path / "special_tag.tsfile"
+    _write_special_tag_file(path)
+
+    reader = TsFileSeriesReader(str(path), show_progress=False)
+    try:
+        expected_path = r"weather.bei\.jing.dev\\1.temperature"
+        assert reader.series_paths == [expected_path]
+        info = reader.get_series_info(expected_path)
+        assert info["tag_values"] == {"city": "bei.jing", "device": r"dev\1"}
+    finally:
+        reader.close()
+
+
+def test_reader_catalog_shares_device_metadata_and_resolves_paths(tmp_path):
+    path = tmp_path / "weather.tsfile"
+    _write_weather_file(path, 100)
+
+    reader = TsFileSeriesReader(str(path), show_progress=False)
+    try:
+        assert reader.series_paths == [
+            "weather.device_a.temperature",
+            "weather.device_a.humidity",
+        ]
+        assert len(reader.catalog.table_entries) == 1
+        assert len(reader.catalog.device_entries) == 1
+        assert reader.catalog.series_count == 2
+
+        by_path = reader.get_series_info("weather.device_a.temperature")
+        by_ref = reader.get_series_info_by_ref(0, 0)
+        assert by_ref == by_path
+        assert by_ref["tag_values"] == {"device": "device_a"}
+
+        ts_by_path = 
reader.get_series_timestamps("weather.device_a.temperature")
+        ts_by_device = reader.get_device_timestamps(0)
+        np.testing.assert_array_equal(ts_by_path, ts_by_device)
+    finally:
+        reader.close()
diff --git a/python/tsfile/__init__.py b/python/tsfile/__init__.py
index 55fa3b9e..84ca330e 100644
--- a/python/tsfile/__init__.py
+++ b/python/tsfile/__init__.py
@@ -40,4 +40,5 @@ from .tsfile_reader import TsFileReaderPy as TsFileReader, 
ResultSetPy as Result
 from .tsfile_writer import TsFileWriterPy as TsFileWriter
 from .tsfile_py_cpp import get_tsfile_config, set_tsfile_config
 from .tsfile_table_writer import TsFileTableWriter
-from .utils import to_dataframe, dataframe_to_tsfile
\ No newline at end of file
+from .utils import to_dataframe, dataframe_to_tsfile
+from .dataset import TsFileDataFrame, Timeseries, AlignedTimeseries
diff --git a/python/tsfile/dataset/__init__.py 
b/python/tsfile/dataset/__init__.py
new file mode 100644
index 00000000..4072bd4c
--- /dev/null
+++ b/python/tsfile/dataset/__init__.py
@@ -0,0 +1,24 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""Dataset-style TsFile accessors."""
+
+from .dataframe import TsFileDataFrame
+from .timeseries import AlignedTimeseries, Timeseries
+
+__all__ = ["TsFileDataFrame", "Timeseries", "AlignedTimeseries"]
diff --git a/python/tsfile/dataset/dataframe.py 
b/python/tsfile/dataset/dataframe.py
new file mode 100644
index 00000000..e28a0e5e
--- /dev/null
+++ b/python/tsfile/dataset/dataframe.py
@@ -0,0 +1,635 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""Top-level dataset accessors for TsFile shards."""
+
+from collections import defaultdict
+from dataclasses import dataclass, field
+import os
+import sys
+from typing import Dict, List, Set, Tuple, Union
+import warnings
+
+import numpy as np
+
+from .formatting import format_dataframe_table
+from .metadata import TableEntry, _coerce_path_component, 
build_logical_series_path, split_logical_series_path
+from .merge import build_aligned_matrix, merge_time_value_parts, 
merge_timestamp_parts
+from .timeseries import AlignedTimeseries, Timeseries
+
+
+DeviceKey = Tuple[str, tuple]
+SeriesRefKey = Tuple[int, int]
+SeriesRef = Tuple[object, int, int]
+DeviceRef = Tuple[object, int]
+
+_QUERY_START = np.iinfo(np.int64).min
+_QUERY_END = np.iinfo(np.int64).max
+
+
+@dataclass(slots=True)
+class _LogicalIndex:
+    """Cross-reader logical mapping for devices and series."""
+
+    # Shared table schema references keyed by table name.
+    table_entries: Dict[str, TableEntry] = field(default_factory=dict)
+
+    # Stable logical device order, each item is (table_name, tag_values).
+    device_order: List[DeviceKey] = field(default_factory=list)
+    # Map one logical device key to its dataframe-local device index.
+    device_index_by_key: Dict[DeviceKey, int] = field(default_factory=dict)
+    # For each logical device, keep the contributing reader-local device refs.
+    device_refs: List[List[DeviceRef]] = field(default_factory=list)
+
+    # Stable logical series order, each item is (device_idx, field_idx).
+    series_refs_ordered: List[SeriesRefKey] = field(default_factory=list)
+    # Map one logical series ref to the contributing reader-local series refs.
+    series_ref_map: Dict[SeriesRefKey, List[SeriesRef]] = 
field(default_factory=dict)
+    # Fast membership check for resolved series refs.
+    series_ref_set: Set[SeriesRefKey] = field(default_factory=set)
+
+
+@dataclass(slots=True)
+class _DerivedCache:
+    """Merged metadata derived from the logical index."""
+
+    devices: List[dict] = field(default_factory=list)
+    field_stats: Dict[SeriesRefKey, dict] = field(default_factory=dict)
+
+
+def _expand_paths(paths: Union[str, List[str]]) -> List[str]:
+    """Normalize file/directory inputs into a validated list of absolute 
TsFile paths."""
+    if isinstance(paths, str):
+        paths = [paths]
+
+    expanded = []
+    for path in paths:
+        if os.path.isdir(path):
+            tsfiles = sorted(
+                os.path.join(root, name)
+                for root, _, files in os.walk(path)
+                for name in files
+                if name.endswith(".tsfile")
+            )
+            if not tsfiles:
+                raise FileNotFoundError(f"No .tsfile files found in directory: 
{path}")
+            expanded.extend(tsfiles)
+        else:
+            expanded.append(path)
+
+    resolved = []
+    for path in expanded:
+        if not os.path.exists(path):
+            raise FileNotFoundError(f"TsFile not found: {path}")
+        resolved.append(os.path.abspath(path))
+    return resolved
+
+
+def _series_lookup_hint(name: str) -> str:
+    return f"Series not found: '{name}'. Use df.list_timeseries() to inspect 
available series."
+
+
+def _validate_table_schema(existing: TableEntry, incoming: TableEntry, 
file_path: str) -> None:
+    """Reject same-name tables whose tag/field layout differs across shards."""
+    if (
+        existing.tag_columns == incoming.tag_columns
+        and existing.tag_types == incoming.tag_types
+        and existing.field_columns == incoming.field_columns
+    ):
+        return
+
+    raise ValueError(
+        f"Incompatible schema for table '{incoming.table_name}' in 
'{file_path}'. "
+        f"Expected tags={list(existing.tag_columns)}, 
tag_types={list(existing.tag_types)}, "
+        f"fields={list(existing.field_columns)} but found "
+        f"tags={list(incoming.tag_columns)}, 
tag_types={list(incoming.tag_types)}, "
+        f"fields={list(incoming.field_columns)}."
+    )
+
+
+def _register_reader(
+    readers: Dict[str, object],
+    index: _LogicalIndex,
+    file_path: str,
+    reader,
+) -> None:
+    """Merge one reader's catalog into the dataframe-wide logical index."""
+    readers[file_path] = reader
+    catalog = reader.catalog
+
+    for table_entry in catalog.table_entries:
+        existing_entry = index.table_entries.get(table_entry.table_name)
+        if existing_entry is None:
+            index.table_entries[table_entry.table_name] = table_entry
+        else:
+            _validate_table_schema(existing_entry, table_entry, file_path)
+
+    for device_id, device_entry in enumerate(catalog.device_entries):
+        table_entry = catalog.table_entries[device_entry.table_id]
+        device_key = (table_entry.table_name, tuple(device_entry.tag_values))
+        device_idx = index.device_index_by_key.get(device_key)
+        if device_idx is None:
+            device_idx = len(index.device_order)
+            index.device_index_by_key[device_key] = device_idx
+            index.device_order.append(device_key)
+            index.device_refs.append([])
+        index.device_refs[device_idx].append((reader, device_id))
+
+        for field_idx in range(len(table_entry.field_columns)):
+            series_ref = (device_idx, field_idx)
+            if series_ref not in index.series_ref_map:
+                index.series_refs_ordered.append(series_ref)
+                index.series_ref_map[series_ref] = []
+            index.series_ref_map[series_ref].append((reader, device_id, 
field_idx))
+
+
+def _build_device_entry(refs: List[DeviceRef]) -> dict:
+    """Compute per-device time bounds after merging all contributing shards."""
+    # [Temporary] It will be replaced by query_by_row and metadata interface 
in TsFile
+    if len(refs) == 1:
+        merged_timestamps = refs[0][0].get_device_timestamps(refs[0][1])
+    else:
+        merged_timestamps = merge_timestamp_parts(
+            [reader.get_device_timestamps(device_id) for reader, device_id in 
refs],
+            validate_unique=True,
+        )
+
+    return {
+        "min_time": int(merged_timestamps[0]) if len(merged_timestamps) > 0 
else None,
+        "max_time": int(merged_timestamps[-1]) if len(merged_timestamps) > 0 
else None,
+    }
+
+
+def _merge_field_timestamps(series_name: str, refs: List[SeriesRef]) -> 
np.ndarray:
+    """Load and merge the full timestamp axis for one logical series on 
demand."""
+    # [Temporary] It will be replaced by query_by_row interface in TsFile
+    time_parts = []
+    for reader, device_id, field_idx in refs:
+        ts_arr, _ = reader.read_series_by_ref(device_id, field_idx, 
_QUERY_START, _QUERY_END)
+        if len(ts_arr) > 0:
+            time_parts.append(ts_arr)
+
+    if not time_parts:
+        merged_timestamps = np.array([], dtype=np.int64)
+    elif len(time_parts) == 1:
+        merged_timestamps = time_parts[0]
+    else:
+        try:
+            merged_timestamps = merge_timestamp_parts(time_parts, 
validate_unique=True)
+        except ValueError as e:
+            message = str(e)
+            duplicate_suffix = message.removeprefix("Duplicate timestamp ")
+            duplicate_suffix = duplicate_suffix.removesuffix(" found across 
shards.")
+            raise ValueError(
+                f"Duplicate timestamp {duplicate_suffix} found for series 
'{series_name}' across shards. "
+                f"Cross-shard duplicate timestamps are not supported."
+            ) from e
+
+    return merged_timestamps
+
+
+def _build_field_stats(refs: List[SeriesRef]) -> dict:
+    """Aggregate cheap per-shard stats without materializing full series 
values."""
+    min_time = None
+    max_time = None
+    count = 0
+
+    for reader, device_id, field_idx in refs:
+        info = reader.get_series_info_by_ref(device_id, field_idx)
+        shard_min = info["min_time"]
+        shard_max = info["max_time"]
+        shard_count = info["length"]
+
+        if shard_count == 0:
+            continue
+
+        count += shard_count
+        min_time = shard_min if min_time is None else min(min_time, shard_min)
+        max_time = shard_max if max_time is None else max(max_time, shard_max)
+
+    return {
+        "min_time": min_time,
+        "max_time": max_time,
+        "count": count,
+    }
+
+
+class _LocIndexer:
+    """Implement ``.loc[start_time:end_time, series_list]`` for aligned 
reads."""
+
+    def __init__(self, dataframe: "TsFileDataFrame"):
+        self._df = dataframe
+
+    def _parse_key(self, key):
+        if not isinstance(key, tuple) or len(key) != 2:
+            raise ValueError("loc requires exactly 2 arguments: 
tsdf.loc[start_time:end_time, series_list]")
+
+        time_slice, series_spec = key
+        if isinstance(time_slice, slice):
+            start_time = _QUERY_START if time_slice.start is None else 
time_slice.start
+            end_time = _QUERY_END if time_slice.stop is None else 
time_slice.stop
+        elif isinstance(time_slice, (int, np.integer)):
+            start_time = end_time = int(time_slice)
+        else:
+            raise TypeError(f"Time index must be slice or int, got 
{type(time_slice)}")
+
+        if isinstance(series_spec, (str, int, np.integer)):
+            series_spec = [series_spec]
+
+        series_refs = []
+        series_names = []
+        for item in series_spec:
+            if isinstance(item, (int, np.integer)):
+                idx = int(item)
+                if idx < 0:
+                    idx += len(self._df._index.series_refs_ordered)
+                if idx < 0 or idx >= len(self._df._index.series_refs_ordered):
+                    raise IndexError(f"Series index {item} out of range")
+                series_ref = self._df._index.series_refs_ordered[idx]
+            elif isinstance(item, str):
+                series_ref = self._df._resolve_series_name(item)
+            else:
+                raise TypeError(f"Series specifier must be int or str, got 
{type(item)}")
+            series_refs.append(series_ref)
+            series_names.append(self._df._build_series_name(series_ref))
+
+        return start_time, end_time, series_refs, series_names
+
+    def _query_aligned(self, start_time: int, end_time: int, series_refs: 
List[SeriesRefKey], series_names: List[str]):
+        """Batch aligned reads by reader/device, then merge per-series 
fragments."""
+        self._df._assert_open()
+        groups = defaultdict(list)
+        for col_idx, series_ref in enumerate(series_refs):
+            device_idx, field_idx = series_ref
+            device_info = self._df._cache.devices[device_idx]
+            if device_info["max_time"] is None or device_info["max_time"] < 
start_time or device_info["min_time"] > end_time:
+                continue
+
+            _, table_entry, _ = self._df._get_series_components(series_ref)
+            field_name = table_entry.field_columns[field_idx]
+            for reader, device_id, reader_field_idx in 
self._df._index.series_ref_map[series_ref]:
+                groups[(id(reader), device_id)].append(
+                    (col_idx, reader_field_idx, field_name, 
series_names[col_idx], reader, device_id)
+                )
+
+        series_time_parts = defaultdict(list)
+        series_value_parts = defaultdict(list)
+        for entries in groups.values():
+            reader = entries[0][4]
+            device_id = entries[0][5]
+            field_indices = list(dict.fromkeys(entry[1] for entry in entries))
+            ts_arr, field_vals = 
reader.read_device_fields_by_time_range(device_id, field_indices, start_time, 
end_time)
+            for _, _, field_name, series_name, _, _ in entries:
+                if len(ts_arr) > 0:
+                    series_time_parts[series_name].append(ts_arr)
+                    
series_value_parts[series_name].append(field_vals[field_name])
+
+        series_data = {}
+        for name in series_names:
+            series_data[name] = 
merge_time_value_parts(series_time_parts[name], series_value_parts[name])
+
+        return build_aligned_matrix(series_names, series_data)
+
+    def __getitem__(self, key) -> AlignedTimeseries:
+        start_time, end_time, series_refs, series_names = self._parse_key(key)
+        timestamps, values = self._query_aligned(start_time, end_time, 
series_refs, series_names)
+        return AlignedTimeseries(timestamps, values, series_names)
+
+
+class TsFileDataFrame:
+    """Lazy-loaded unified numeric dataset view over multiple TsFile shards."""
+
+    def __init__(self, paths: Union[str, List[str]], show_progress: bool = 
True):
+        self._paths = _expand_paths(paths)
+        self._show_progress = show_progress
+        self._readers: Dict[str, object] = {}
+        self._index = _LogicalIndex()
+        self._cache = _DerivedCache()
+        self._is_view = False
+        self._root = None
+        self._closed = False
+        self._load_metadata()
+
+    @classmethod
+    def _from_subset(cls, parent: "TsFileDataFrame", series_refs: 
List[SeriesRefKey]) -> "TsFileDataFrame":
+        """Create a lightweight view that reuses the parent's readers and 
caches."""
+        obj = object.__new__(cls)
+        obj._root = parent._root if parent._is_view else parent
+        obj._is_view = True
+        obj._paths = parent._paths
+        obj._show_progress = parent._show_progress
+        obj._readers = parent._readers
+        obj._index = _LogicalIndex(
+            table_entries=parent._index.table_entries,
+            device_order=parent._index.device_order,
+            device_index_by_key=parent._index.device_index_by_key,
+            device_refs=parent._index.device_refs,
+            series_refs_ordered=list(series_refs),
+            series_ref_map=parent._index.series_ref_map,
+            series_ref_set=set(series_refs),
+        )
+        obj._cache = _DerivedCache(devices=parent._cache.devices, 
field_stats=parent._cache.field_stats)
+        obj._closed = False
+        return obj
+
+    def _owner(self) -> "TsFileDataFrame":
+        return self._root if self._is_view else self
+
+    def _assert_open(self):
+        if self._owner()._closed:
+            raise RuntimeError("Current TsFileDataFrame is closed.")
+
+    def _load_metadata(self):
+        """Build the logical cross-file index and the derived per-series 
caches."""
+        from .reader import TsFileSeriesReader
+
+        if len(self._paths) >= 2:
+            self._load_metadata_parallel(TsFileSeriesReader)
+        else:
+            self._load_metadata_serial(TsFileSeriesReader)
+
+        self._cache.devices = [_build_device_entry(refs) for refs in 
self._index.device_refs]
+        for series_ref in self._index.series_refs_ordered:
+            self._cache.field_stats[series_ref] = 
_build_field_stats(self._index.series_ref_map[series_ref])
+
+        self._index.series_ref_set = set(self._index.series_refs_ordered)
+        if not self._index.series_refs_ordered:
+            raise ValueError("No valid time series found in the provided 
TsFile files")
+
+    def _load_metadata_serial(self, reader_class):
+        for file_path in self._paths:
+            _register_reader(
+                self._readers,
+                self._index,
+                file_path,
+                reader_class(file_path, show_progress=self._show_progress),
+            )
+
+    def _load_metadata_parallel(self, reader_class):
+        from concurrent.futures import ThreadPoolExecutor, as_completed
+
+        def open_file(file_path):
+            return file_path, reader_class(file_path, show_progress=False)
+
+        total = len(self._paths)
+        with ThreadPoolExecutor(max_workers=min(total, os.cpu_count() or 4)) 
as executor:
+            futures = {executor.submit(open_file, path): path for path in 
self._paths}
+            results = {}
+            done = 0
+            for future in as_completed(futures):
+                file_path, reader = future.result()
+                results[file_path] = reader
+                done += 1
+                if self._show_progress:
+                    sys.stderr.write(f"\rLoading TsFile shards: 
{done}/{total}")
+                    sys.stderr.flush()
+
+        if self._show_progress and total > 0:
+            total_series = sum(reader.series_count for reader in 
results.values())
+            sys.stderr.write(f"\rLoading TsFile shards: {total}/{total} 
({total_series} series) ... done\n")
+            sys.stderr.flush()
+
+        for file_path in self._paths:
+            _register_reader(
+                self._readers,
+                self._index,
+                file_path,
+                results[file_path],
+            )
+
+    def _get_series_components(self, series_ref: SeriesRefKey) -> 
Tuple[DeviceKey, TableEntry, int]:
+        device_idx, field_idx = series_ref
+        device_key = self._index.device_order[device_idx]
+        return device_key, self._index.table_entries[device_key[0]], field_idx
+
+    def _build_series_name(self, series_ref: SeriesRefKey) -> str:
+        device_key, table_entry, field_idx = 
self._get_series_components(series_ref)
+        table_name, tag_values = device_key
+        field_name = table_entry.field_columns[field_idx]
+        return build_logical_series_path(table_name, tag_values, field_name)
+
+    def _resolve_series_name(self, series_name: str) -> SeriesRefKey:
+        try:
+            parts = split_logical_series_path(series_name)
+        except ValueError as exc:
+            raise KeyError(_series_lookup_hint(series_name)) from exc
+        if len(parts) < 2:
+            raise KeyError(_series_lookup_hint(series_name))
+
+        table_name = parts[0]
+        if table_name not in self._index.table_entries:
+            raise KeyError(_series_lookup_hint(series_name))
+
+        table_entry = self._index.table_entries[table_name]
+        expected_parts = len(table_entry.tag_columns) + 2
+        if len(parts) != expected_parts:
+            raise KeyError(_series_lookup_hint(series_name))
+
+        field_name = parts[-1]
+        try:
+            field_idx = table_entry.get_field_index(field_name)
+        except ValueError as exc:
+            raise KeyError(_series_lookup_hint(series_name)) from exc
+
+        tag_values = tuple(
+            _coerce_path_component(raw_value, tag_type)
+            for raw_value, tag_type in zip(parts[1:-1], table_entry.tag_types)
+        )
+        device_key = (table_name, tag_values)
+        device_idx = self._index.device_index_by_key.get(device_key)
+        if device_idx is None:
+            raise KeyError(_series_lookup_hint(series_name))
+
+        series_ref = (device_idx, field_idx)
+        if series_ref not in self._index.series_ref_set:
+            raise KeyError(_series_lookup_hint(series_name))
+        return series_ref
+
+    def _build_series_info(self, series_ref: SeriesRefKey) -> dict:
+        device_idx, field_idx = series_ref
+        device_key, table_entry, _ = self._get_series_components(series_ref)
+        field_stats = self._cache.field_stats[series_ref]
+        return {
+            "table_name": table_entry.table_name,
+            "field": table_entry.field_columns[field_idx],
+            "tag_columns": table_entry.tag_columns,
+            "tag_values": dict(zip(table_entry.tag_columns, device_key[1])),
+            "min_time": field_stats["min_time"],
+            "max_time": field_stats["max_time"],
+            "count": field_stats["count"],
+        }
+
+    def __len__(self) -> int:
+        return len(self._index.series_refs_ordered)
+
+    def list_timeseries(self, path_prefix: str = "") -> List[str]:
+        names = [self._build_series_name(series_ref) for series_ref in 
self._index.series_refs_ordered]
+        if not path_prefix:
+            return names
+        prefix = path_prefix if path_prefix.endswith(".") else path_prefix + 
"."
+        return [name for name in names if name.startswith(prefix) or name == 
path_prefix]
+
+    def _get_timeseries(self, series_ref: SeriesRefKey) -> Timeseries:
+        self._assert_open()
+        series_name = self._build_series_name(series_ref)
+        return Timeseries(
+            series_name,
+            self._index.series_ref_map[series_ref],
+            self._cache.field_stats[series_ref],
+            self._assert_open,
+            lambda: _merge_field_timestamps(series_name, 
self._index.series_ref_map[series_ref]),
+        )
+
+    def __getitem__(self, key):
+        try:
+            import pandas as pd
+
+            if isinstance(key, pd.Series) and key.dtype == bool:
+                selected = [self._index.series_refs_ordered[idx] for idx in 
key.index[key]]
+                return TsFileDataFrame._from_subset(self, selected)
+        except ImportError:
+            pass
+
+        if isinstance(key, (int, np.integer)):
+            idx = int(key)
+            if idx < 0:
+                idx += len(self._index.series_refs_ordered)
+            if idx < 0 or idx >= len(self._index.series_refs_ordered):
+                raise IndexError(f"Index {idx} out of range [0, 
{len(self._index.series_refs_ordered)})")
+            return self._get_timeseries(self._index.series_refs_ordered[idx])
+
+        if isinstance(key, str):
+            try:
+                return self._get_timeseries(self._resolve_series_name(key))
+            except KeyError:
+                pass
+
+            valid_columns = {"table", "field", "start_time", "end_time", 
"count"}
+            valid_columns.update(self._collect_tag_columns())
+            if key not in valid_columns:
+                raise KeyError(_series_lookup_hint(key))
+
+            import pandas as pd
+
+            values = []
+            for series_ref in self._index.series_refs_ordered:
+                info = self._build_series_info(series_ref)
+                if key == "table":
+                    values.append(info["table_name"])
+                elif key == "field":
+                    values.append(info["field"])
+                elif key == "start_time":
+                    values.append(info["min_time"])
+                elif key == "end_time":
+                    values.append(info["max_time"])
+                elif key == "count":
+                    values.append(info["count"])
+                else:
+                    values.append(info["tag_values"].get(key, ""))
+            return pd.Series(values, name=key)
+
+        if isinstance(key, slice):
+            return TsFileDataFrame._from_subset(
+                self,
+                [self._index.series_refs_ordered[idx] for idx in 
range(*key.indices(len(self._index.series_refs_ordered)))],
+            )
+
+        if isinstance(key, list):
+            selected = []
+            for item in key:
+                if not isinstance(item, (int, np.integer)):
+                    raise TypeError(f"List index must contain integers, got 
{type(item)}")
+                idx = int(item)
+                if idx < 0:
+                    idx += len(self._index.series_refs_ordered)
+                if idx < 0 or idx >= len(self._index.series_refs_ordered):
+                    raise IndexError(f"Index {item} out of range [0, 
{len(self._index.series_refs_ordered)})")
+                selected.append(self._index.series_refs_ordered[idx])
+            return TsFileDataFrame._from_subset(self, selected)
+
+        raise TypeError(f"Unsupported key type: {type(key)}")
+
+    @property
+    def loc(self):
+        return _LocIndexer(self)
+
+    def _collect_tag_columns(self) -> List[str]:
+        seen = {}
+        for table_name, _ in self._index.device_order:
+            for column in self._index.table_entries[table_name].tag_columns:
+                seen.setdefault(column, True)
+        return list(seen.keys())
+
+    def _format_table(self, indices=None, max_rows: int = 20) -> str:
+        series_names = []
+        merged_info = {}
+        for series_ref in self._index.series_refs_ordered:
+            series_name = self._build_series_name(series_ref)
+            series_names.append(series_name)
+            merged_info[series_name] = self._build_series_info(series_ref)
+
+        return format_dataframe_table(
+            series_names,
+            merged_info,
+            self._collect_tag_columns(),
+            indices=indices,
+            max_rows=max_rows,
+        )
+
+    def _repr_header(self) -> str:
+        total = len(self._index.series_refs_ordered)
+        if self._is_view:
+            return f"TsFileDataFrame({total} time series, subset of 
{len(self._root._index.series_refs_ordered)})\n"
+        return f"TsFileDataFrame({total} time series, {len(self._paths)} 
files)\n"
+
+    def __repr__(self):
+        return self._repr_header() + self._format_table()
+
+    def __str__(self):
+        return self.__repr__()
+
+    def show(self, max_rows: int = 20):
+        print(self._repr_header() + self._format_table(max_rows=max_rows))
+
+    def close(self):
+        if self._is_view:
+            warnings.warn(
+                "close() on a subset TsFileDataFrame is a no-op; only the root 
dataframe owns the readers.",
+                RuntimeWarning,
+                stacklevel=2,
+            )
+            return
+        if self._closed:
+            return
+        for reader in self._readers.values():
+            reader.close()
+        self._readers.clear()
+        self._closed = True
+
+    def __del__(self):
+        try:
+            if not getattr(self, "_is_view", False):
+                self.close()
+        except Exception:
+            pass
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, *args):
+        self.close()
diff --git a/python/tsfile/dataset/formatting.py 
b/python/tsfile/dataset/formatting.py
new file mode 100644
index 00000000..5e01bb39
--- /dev/null
+++ b/python/tsfile/dataset/formatting.py
@@ -0,0 +1,171 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""String formatting helpers for dataset objects."""
+
+from datetime import datetime
+from typing import Dict, List, Optional
+
+import numpy as np
+
+
+def format_timestamp(ts_ms: int) -> str:
+    """Convert millisecond timestamp to human-readable string."""
+    try:
+        dt = datetime.fromtimestamp(ts_ms / 1000)
+        if ts_ms % 1000 == 0:
+            return dt.strftime("%Y-%m-%d %H:%M:%S")
+        return dt.strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
+    except (OSError, ValueError, TypeError):
+        return str(ts_ms)
+
+
+def format_aligned_timeseries(
+    timestamps: np.ndarray,
+    values: np.ndarray,
+    series_names: List[str],
+    max_rows: Optional[int],
+) -> str:
+    """Render a table-like string for aligned query results.
+
+    Uses head/tail truncation with '...' when rows exceed *max_rows*,
+    consistent with format_dataframe_table.
+    """
+    n_rows, n_cols = values.shape
+    if n_rows == 0:
+        return f"AlignedTimeseries(0 rows, {n_cols} series)"
+
+    # Determine which rows to render (head + tail when truncated)
+    if max_rows is not None and n_rows > max_rows:
+        head = max_rows // 2
+        tail = max_rows - head
+        show_indices = list(range(head)) + list(range(n_rows - tail, n_rows))
+        truncated = True
+    else:
+        show_indices = list(range(n_rows))
+        truncated = False
+
+    # Only format the rows we will actually display
+    ts_strs = {i: format_timestamp(int(timestamps[i])) for i in show_indices}
+    ts_width = max(max((len(s) for s in ts_strs.values()), default=0), 
len("time"))
+
+    col_widths = []
+    rendered_values = []  # list of dicts: row_idx -> cell string
+    for col_idx in range(n_cols):
+        col_name = series_names[col_idx] if col_idx < len(series_names) else 
f"col_{col_idx}"
+        width = len(col_name)
+        column = {}
+        for row_idx in show_indices:
+            value = values[row_idx, col_idx]
+            cell = "NaN" if np.isnan(value) else f"{value:.2f}"
+            column[row_idx] = cell
+            width = max(width, len(cell))
+        rendered_values.append(column)
+        col_widths.append(width)
+
+    header = ["time".rjust(ts_width)]
+    for col_idx, width in enumerate(col_widths):
+        col_name = series_names[col_idx] if col_idx < len(series_names) else 
f"col_{col_idx}"
+        header.append(col_name.rjust(width))
+    lines = ["  ".join(header)]
+
+    head_count = max_rows // 2 if truncated else len(show_indices)
+    for i, row_idx in enumerate(show_indices):
+        if truncated and i == head_count:
+            lines.append("...")
+        parts = [ts_strs[row_idx].rjust(ts_width)]
+        for col_idx, width in enumerate(col_widths):
+            parts.append(rendered_values[col_idx][row_idx].rjust(width))
+        lines.append("  ".join(parts))
+
+    return f"AlignedTimeseries({n_rows} rows, {n_cols} series)\n" + 
"\n".join(lines)
+
+
+def format_dataframe_table(
+    series_list: List[str],
+    merged_info: Dict[str, dict],
+    tag_columns: List[str],
+    indices: Optional[List[int]] = None,
+    max_rows: int = 20,
+) -> str:
+    """Render the metadata table used by TsFileDataFrame.__repr__."""
+    if indices is None:
+        indices = list(range(len(series_list)))
+    else:
+        indices = list(indices)
+
+    total = len(indices)
+    if total > max_rows:
+        show_indices = list(indices[: max_rows // 2]) + list(indices[-max_rows 
// 2 :])
+        truncated = True
+    else:
+        show_indices = indices
+        truncated = False
+
+    rows = []
+    for idx in show_indices:
+        name = series_list[idx]
+        info = merged_info[name]
+        row = {
+            "index": idx,
+            "table": info["table_name"],
+            "field": info["field"],
+            "start_time": format_timestamp(info["min_time"]),
+            "end_time": format_timestamp(info["max_time"]),
+            "count": info["count"],
+        }
+        for tag_col in tag_columns:
+            row[tag_col] = info["tag_values"].get(tag_col, "")
+        rows.append(row)
+
+    if not rows:
+        return "Empty TsFileDataFrame"
+
+    headers = ["", "table"] + tag_columns + ["field", "start_time", 
"end_time", "count"]
+    widths = {header: len(header) for header in headers}
+    widths[""] = max(len(str(row["index"])) for row in rows)
+
+    for row in rows:
+        widths[""] = max(widths[""], len(str(row["index"])))
+        widths["table"] = max(widths["table"], len(row["table"]))
+        widths["field"] = max(widths["field"], len(row["field"]))
+        widths["start_time"] = max(widths["start_time"], 
len(row["start_time"]))
+        widths["end_time"] = max(widths["end_time"], len(row["end_time"]))
+        widths["count"] = max(widths["count"], len(str(row["count"])))
+        for tag_col in tag_columns:
+            widths[tag_col] = max(widths[tag_col], len(str(row[tag_col])))
+
+    lines = ["  ".join(header.rjust(widths[header]) for header in headers)]
+    split = len(rows) // 2 if truncated else len(rows)
+    for row_idx, row in enumerate(rows):
+        if truncated and row_idx == split:
+            lines.append("...")
+        parts = [str(row["index"]).rjust(widths[""]), 
row["table"].rjust(widths["table"])]
+        for tag_col in tag_columns:
+            parts.append(str(row[tag_col]).rjust(widths[tag_col]))
+        parts.extend(
+            [
+                row["field"].rjust(widths["field"]),
+                row["start_time"].rjust(widths["start_time"]),
+                row["end_time"].rjust(widths["end_time"]),
+                str(row["count"]).rjust(widths["count"]),
+            ]
+        )
+        lines.append("  ".join(parts))
+
+    return "\n".join(lines)
diff --git a/python/tsfile/dataset/merge.py b/python/tsfile/dataset/merge.py
new file mode 100644
index 00000000..1f72290e
--- /dev/null
+++ b/python/tsfile/dataset/merge.py
@@ -0,0 +1,154 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed with this work under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""Merge helpers for dataset reads.
+
+The dataset package enforces a strict cross-shard merge policy:
+- only numeric-compatible field columns are exposed,
+- null numeric values are represented as ``NaN``,
+- duplicate timestamps for the same logical series across shards are rejected.
+"""
+
+import heapq
+from typing import Dict, List, Tuple
+
+import numpy as np
+
+
+def merge_timestamp_parts(
+    time_parts: List[np.ndarray],
+    *,
+    deduplicate: bool = False,
+    validate_unique: bool = False,
+) -> np.ndarray:
+    """Merge sorted timestamp parts with optional deduplication or 
validation."""
+    parts = [ts_part for ts_part in time_parts if len(ts_part) > 0]
+    if not parts:
+        return np.array([], dtype=np.int64)
+    if len(parts) == 1:
+        return parts[0]
+
+    parts.sort(key=lambda ts_part: int(ts_part[0]))
+    if all(int(parts[idx - 1][-1]) < int(parts[idx][0]) for idx in range(1, 
len(parts))):
+        return np.concatenate(parts)
+
+    total_length = sum(len(ts_part) for ts_part in parts)
+    merged = np.empty(total_length, dtype=np.int64)
+
+    heap = [(int(ts_part[0]), part_idx, 0) for part_idx, ts_part in 
enumerate(parts)]
+    heapq.heapify(heap)
+
+    out_idx = 0
+    last_ts = None
+    while heap:
+        ts, part_idx, offset = heapq.heappop(heap)
+
+        if last_ts is not None and ts == last_ts:
+            if validate_unique:
+                raise ValueError(f"Duplicate timestamp {ts} found across 
shards.")
+            if not deduplicate:
+                merged[out_idx] = ts
+                out_idx += 1
+        else:
+            merged[out_idx] = ts
+            out_idx += 1
+            last_ts = ts
+
+        next_offset = offset + 1
+        if next_offset < len(parts[part_idx]):
+            heapq.heappush(heap, (int(parts[part_idx][next_offset]), part_idx, 
next_offset))
+
+    if validate_unique:
+        return merged[:out_idx]
+    if deduplicate:
+        return merged[:out_idx]
+    return merged[:out_idx]
+
+
+def merge_time_value_parts(
+    time_parts: List[np.ndarray],
+    value_parts: List[np.ndarray],
+) -> Tuple[np.ndarray, np.ndarray]:
+    """Merge sorted time/value parts for one logical series.
+
+    Duplicate timestamps are validated during metadata loading, so the query
+    path can assume each part is already sorted and conflict-free.
+
+    Fast path: if shard ranges do not overlap in time, concatenate in shard
+    order after sorting parts by their first timestamp.
+    Fallback: use a k-way merge for overlapping-but-disjoint ranges.
+    """
+    parts = [(ts_part, val_part) for ts_part, val_part in zip(time_parts, 
value_parts) if len(ts_part) > 0]
+    if not parts:
+        return np.array([], dtype=np.int64), np.array([], dtype=np.float64)
+    if len(parts) == 1:
+        return parts[0]
+
+    parts.sort(key=lambda item: int(item[0][0]))
+    time_parts = [ts_part for ts_part, _ in parts]
+    value_parts = [val_part for _, val_part in parts]
+
+    if all(int(time_parts[idx - 1][-1]) < int(time_parts[idx][0]) for idx in 
range(1, len(time_parts))):
+        return np.concatenate(time_parts), np.concatenate(value_parts)
+
+    total_length = sum(len(ts_part) for ts_part in time_parts)
+    merged_ts = np.empty(total_length, dtype=np.int64)
+    merged_vals = np.empty(total_length, dtype=np.float64)
+
+    heap = [(int(ts_part[0]), part_idx, 0) for part_idx, ts_part in 
enumerate(time_parts)]
+    heapq.heapify(heap)
+
+    out_idx = 0
+    while heap:
+        _, part_idx, offset = heapq.heappop(heap)
+        merged_ts[out_idx] = time_parts[part_idx][offset]
+        merged_vals[out_idx] = value_parts[part_idx][offset]
+        out_idx += 1
+
+        next_offset = offset + 1
+        if next_offset < len(time_parts[part_idx]):
+            heapq.heappush(heap, (int(time_parts[part_idx][next_offset]), 
part_idx, next_offset))
+
+    return merged_ts, merged_vals
+
+
+def build_aligned_matrix(
+    series_names: List[str], series_data: Dict[str, Tuple[np.ndarray, 
np.ndarray]]
+) -> Tuple[np.ndarray, np.ndarray]:
+    """Build a timestamp union and aligned value matrix for multiple series.
+
+    Each input series is assumed to already satisfy the dataset merge policy,
+    meaning its timestamp array is unique within that logical series.
+    """
+    all_ts_arrays = [ts for ts, _ in series_data.values() if len(ts) > 0]
+    if not all_ts_arrays:
+        return np.array([], dtype=np.int64), np.empty((0, len(series_names)))
+
+    timestamps = merge_timestamp_parts(all_ts_arrays, deduplicate=True)
+    values = np.full((len(timestamps), len(series_names)), np.nan)
+
+    for col_idx, name in enumerate(series_names):
+        if name not in series_data:
+            continue
+        ts_arr, val_arr = series_data[name]
+        if len(ts_arr) == 0:
+            continue
+        indices = np.searchsorted(timestamps, ts_arr)
+        values[indices, col_idx] = val_arr
+
+    return timestamps, values
diff --git a/python/tsfile/dataset/metadata.py 
b/python/tsfile/dataset/metadata.py
new file mode 100644
index 00000000..5c4611e0
--- /dev/null
+++ b/python/tsfile/dataset/metadata.py
@@ -0,0 +1,227 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""Shared metadata models for dataset readers and views."""
+
+from dataclasses import dataclass, field
+from typing import Any, Dict, Iterable, Iterator, List, Tuple
+
+import numpy as np
+
+from ..constants import TSDataType
+
+
+_PATH_SEPARATOR = "."
+_PATH_ESCAPE = "\\"
+
+
+@dataclass(slots=True)
+class TableEntry:
+    """Schema-level metadata shared by every device in one table."""
+
+    table_name: str
+    tag_columns: Tuple[str, ...]
+    tag_types: Tuple[TSDataType, ...]
+    field_columns: Tuple[str, ...]
+    _field_index_by_name: Dict[str, int] = field(init=False, repr=False)
+
+    def __post_init__(self):
+        self._field_index_by_name = {column: idx for idx, column in 
enumerate(self.field_columns)}
+
+    def get_field_index(self, field_name: str) -> int:
+        if field_name not in self._field_index_by_name:
+            raise ValueError(f"Field not found in table '{self.table_name}': 
{field_name}")
+        return self._field_index_by_name[field_name]
+
+
+@dataclass(slots=True)
+class DeviceEntry:
+    """One logical device identified by table_id + ordered tag values.
+
+    The table_id refers to MetadataCatalog.table_entries[table_id].
+    """
+
+    table_id: int
+    tag_values: Tuple[Any, ...]
+    timestamps: np.ndarray
+    length: int
+    min_time: int
+    max_time: int
+
+
+@dataclass(slots=True)
+class MetadataCatalog:
+    """Canonical metadata store shared by dataset readers and dataframes."""
+
+    table_entries: List[TableEntry] = field(default_factory=list)
+    device_entries: List[DeviceEntry] = field(default_factory=list)
+    table_id_by_name: Dict[str, int] = field(default_factory=dict)
+    device_id_by_key: Dict[Tuple[int, tuple], int] = 
field(default_factory=dict)
+
+    def add_table(
+        self,
+        table_name: str,
+        tag_columns: Iterable[str],
+        tag_types: Iterable[TSDataType],
+        field_columns: Iterable[str],
+    ) -> int:
+        table_id = len(self.table_entries)
+        self.table_entries.append(
+            TableEntry(
+                table_name=table_name,
+                tag_columns=tuple(tag_columns),
+                tag_types=tuple(tag_types),
+                field_columns=tuple(field_columns),
+            )
+        )
+        self.table_id_by_name[table_name] = table_id
+        return table_id
+
+    def add_device(self, table_id: int, tag_values: tuple, timestamps: 
np.ndarray) -> int:
+        key = (table_id, tuple(tag_values))
+        if key in self.device_id_by_key:
+            return self.device_id_by_key[key]
+
+        timestamps = np.sort(timestamps)
+        if len(timestamps) == 0:
+            raise ValueError("Cannot register a device without timestamps.")
+
+        device_id = len(self.device_entries)
+        self.device_entries.append(
+            DeviceEntry(
+                table_id=table_id,
+                tag_values=tuple(tag_values),
+                timestamps=timestamps,
+                length=len(timestamps),
+                min_time=int(timestamps[0]),
+                max_time=int(timestamps[-1]),
+            )
+        )
+        self.device_id_by_key[key] = device_id
+        return device_id
+
+    @property
+    def series_count(self) -> int:
+        return sum(len(self.table_entries[device.table_id].field_columns) for 
device in self.device_entries)
+
+
+def _escape_path_component(value: Any) -> str:
+    return str(value).replace(_PATH_ESCAPE, _PATH_ESCAPE * 
2).replace(_PATH_SEPARATOR, _PATH_ESCAPE + _PATH_SEPARATOR)
+
+
+def split_logical_series_path(series_path: str) -> List[str]:
+    parts = []
+    current = []
+    escaping = False
+
+    for char in series_path:
+        if escaping:
+            current.append(char)
+            escaping = False
+            continue
+        if char == _PATH_ESCAPE:
+            escaping = True
+            continue
+        if char == _PATH_SEPARATOR:
+            parts.append("".join(current))
+            current = []
+            continue
+        current.append(char)
+
+    if escaping:
+        raise ValueError(f"Invalid series path: {series_path}")
+
+    parts.append("".join(current))
+    return parts
+
+
+def build_logical_series_path(table_name: str, tag_values: Iterable[Any], 
field_name: str) -> str:
+    components = [table_name, *tag_values, field_name]
+    return _PATH_SEPARATOR.join(_escape_path_component(component) for 
component in components)
+
+
+def build_series_path(catalog: MetadataCatalog, device_id: int, field_idx: 
int) -> str:
+    """Return the external logical series name for one device field."""
+    device_entry = catalog.device_entries[device_id]
+    table_entry = catalog.table_entries[device_entry.table_id]
+    field_name = table_entry.field_columns[field_idx]
+    return build_logical_series_path(table_entry.table_name, 
device_entry.tag_values, field_name)
+
+
+def iter_series_refs(catalog: MetadataCatalog) -> Iterator[Tuple[int, int]]:
+    """Yield ``(device_id, field_idx)`` pairs in catalog order."""
+    for device_id, device_entry in enumerate(catalog.device_entries):
+        table_entry = catalog.table_entries[device_entry.table_id]
+        for field_idx in range(len(table_entry.field_columns)):
+            yield device_id, field_idx
+
+
+def iter_series_paths(catalog: MetadataCatalog) -> Iterator[str]:
+    """Yield logical series names in catalog order."""
+    for device_id, field_idx in iter_series_refs(catalog):
+        yield build_series_path(catalog, device_id, field_idx)
+
+
+def resolve_series_path(catalog: MetadataCatalog, series_path: str) -> 
Tuple[int, int, int]:
+    """Resolve an external path to ``(table_id, device_id, field_idx)``."""
+    parts = split_logical_series_path(series_path)
+    if len(parts) < 2:
+        raise ValueError(f"Invalid series path: {series_path}")
+
+    table_name = parts[0]
+    if table_name not in catalog.table_id_by_name:
+        raise ValueError(f"Series not found: {series_path}")
+
+    table_id = catalog.table_id_by_name[table_name]
+    table_entry = catalog.table_entries[table_id]
+    expected_parts = len(table_entry.tag_columns) + 2
+    if len(parts) != expected_parts:
+        raise ValueError(f"Series not found: {series_path}")
+
+    field_name = parts[-1]
+    try:
+        field_idx = table_entry.get_field_index(field_name)
+    except ValueError as exc:
+        raise ValueError(f"Series not found: {series_path}") from exc
+
+    tag_values = tuple(
+        _coerce_path_component(raw_value, tag_type)
+        for raw_value, tag_type in zip(parts[1:-1], table_entry.tag_types)
+    )
+    key = (table_id, tag_values)
+    if key not in catalog.device_id_by_key:
+        raise ValueError(f"Series not found: {series_path}")
+
+    return table_id, catalog.device_id_by_key[key], field_idx
+
+
+def _coerce_path_component(value: str, data_type: TSDataType) -> Any:
+    if data_type in {TSDataType.STRING, TSDataType.TEXT, TSDataType.BLOB}:
+        return value
+    if data_type == TSDataType.BOOLEAN:
+        lowered = value.lower()
+        if lowered == "true":
+            return True
+        if lowered == "false":
+            return False
+        raise ValueError(f"Invalid boolean tag value: {value}")
+    if data_type in {TSDataType.INT32, TSDataType.INT64, TSDataType.TIMESTAMP, 
TSDataType.DATE}:
+        return int(value)
+    if data_type in {TSDataType.FLOAT, TSDataType.DOUBLE}:
+        return float(value)
+    return value
diff --git a/python/tsfile/dataset/reader.py b/python/tsfile/dataset/reader.py
new file mode 100644
index 00000000..365dc884
--- /dev/null
+++ b/python/tsfile/dataset/reader.py
@@ -0,0 +1,354 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""Single-file reader backend used by TsFileDataFrame."""
+
+import os
+import sys
+from typing import Dict, Iterator, List, Tuple
+
+import numpy as np
+import pyarrow.compute as pc
+
+from ..constants import ColumnCategory, TSDataType
+from ..tsfile_reader import TsFileReaderPy
+from .metadata import MetadataCatalog, build_series_path, iter_series_refs, 
resolve_series_path
+
+
+_NUMERIC_FIELD_TYPES = {
+    TSDataType.BOOLEAN,
+    TSDataType.INT32,
+    TSDataType.INT64,
+    TSDataType.FLOAT,
+    TSDataType.DOUBLE,
+    TSDataType.TIMESTAMP,
+}
+
+
+def _to_python_scalar(value):
+    return value.item() if hasattr(value, "item") else value
+
+
+class TsFileSeriesReader:
+    """Wrap ``TsFileReaderPy`` with numeric dataset discovery and batch 
reads."""
+
+    def __init__(self, file_path: str, show_progress: bool = True):
+        if not os.path.exists(file_path):
+            raise FileNotFoundError(f"TsFile not found: {file_path}")
+
+        self.file_path = file_path
+        self.show_progress = show_progress
+
+        try:
+            self._reader = TsFileReaderPy(file_path)
+        except Exception as e:
+            raise ValueError(f"Failed to open TsFile: {e}") from e
+
+        self._catalog = MetadataCatalog()
+        self._cache_metadata()
+
+    def __del__(self):
+        self.close()
+
+    @property
+    def catalog(self) -> MetadataCatalog:
+        return self._catalog
+
+    @property
+    def series_paths(self) -> List[str]:
+        return list(self.iter_series_paths())
+
+    @property
+    def series_count(self) -> int:
+        return self._catalog.series_count
+
+    def iter_series_paths(self) -> Iterator[str]:
+        for device_id, field_idx in iter_series_refs(self._catalog):
+            yield build_series_path(self._catalog, device_id, field_idx)
+
+    def iter_series_refs(self) -> Iterator[Tuple[str, int, int]]:
+        for device_id, field_idx in iter_series_refs(self._catalog):
+            yield build_series_path(self._catalog, device_id, field_idx), 
device_id, field_idx
+
+    def close(self):
+        if hasattr(self, "_reader"):
+            try:
+                self._reader.close()
+            except Exception:
+                pass
+
+    def _cache_metadata(self):
+        """Wrap metadata discovery so reader construction surfaces one stable 
error shape."""
+        try:
+            self._cache_metadata_table_model()
+            # todo: we should support tree model
+        except Exception as e:
+            raise ValueError(
+                f"Failed to read TsFile metadata. Please ensure the TsFile is 
valid and readable. Error: {e}"
+            ) from e
+
+    def _cache_metadata_table_model(self):
+        """Build the in-memory catalog by scanning table batches from the 
file."""
+        table_schemas = self._reader.get_all_table_schemas()
+        if not table_schemas:
+            raise ValueError("No tables found in TsFile")
+
+        self._catalog = MetadataCatalog()
+        total_rows = 0
+        table_names = list(table_schemas.keys())
+
+        for table_index, table_name in enumerate(table_names):
+            table_schema = table_schemas[table_name]
+
+            tag_columns = []
+            tag_types = []
+            field_columns = []
+            for column_schema in table_schema.get_columns():
+                column_name = column_schema.get_column_name()
+                column_category = column_schema.get_category()
+                if column_category == ColumnCategory.TIME:
+                    continue
+                if column_category == ColumnCategory.TAG:
+                    tag_columns.append(column_name)
+                    tag_types.append(column_schema.get_data_type())
+
+                # ignore fields which is not numeric, we won't use them 
currently.
+                elif (
+                    column_category == ColumnCategory.FIELD
+                    and column_schema.get_data_type() in _NUMERIC_FIELD_TYPES
+                ):
+                    field_columns.append(column_name)
+
+            if not field_columns:
+                continue
+
+            table_id = self._catalog.add_table(table_name, tag_columns, 
tag_types, field_columns)
+            time_arrays = []
+            tag_arrays = {tag_column: [] for tag_column in tag_columns}
+
+            # [Temporary] It will be replaced by new tsfile api, we won't 
query all the data later.
+            query_columns = tag_columns + field_columns
+
+            with self._reader.query_table_batch(table_name, query_columns, 
batch_size=65536) as result_set:
+                while True:
+                    arrow_table = result_set.read_arrow_batch()
+                    if arrow_table is None:
+                        break
+                    batch_rows = arrow_table.num_rows
+                    total_rows += batch_rows
+                    time_arrays.append(arrow_table.column("time").to_numpy())
+                    for tag_column in tag_columns:
+                        
tag_arrays[tag_column].append(arrow_table.column(tag_column).to_numpy())
+
+                    if self.show_progress:
+                        sys.stderr.write(
+                            f"\rReading TsFile metadata: table {table_index + 
1}/{len(table_names)} "
+                            f"[{table_name}] ({total_rows:,} rows)"
+                        )
+                        sys.stderr.flush()
+
+            if not time_arrays:
+                continue
+
+            timestamps = np.concatenate(time_arrays).astype(np.int64)
+            if not tag_columns:
+                self._add_device(table_id, (), timestamps)
+                continue
+
+            for tag_values, device_timestamps in 
self._iter_device_groups(tag_columns, timestamps, tag_arrays):
+                self._add_device(table_id, tag_values, device_timestamps)
+
+        if self.show_progress and total_rows > 0:
+            sys.stderr.write(
+                f"\rReading TsFile metadata: {len(table_names)} table(s), 
{total_rows:,} rows, "
+                f"{self.series_count} series ... done\n"
+            )
+            sys.stderr.flush()
+
+        if self.series_count == 0:
+            raise ValueError("No valid numeric series found in TsFile")
+
+    def _iter_device_groups(
+        self,
+        tag_columns: List[str],
+        timestamps: np.ndarray,
+        tag_arrays: Dict[str, list],
+    ) -> Iterator[Tuple[tuple, np.ndarray]]:
+        """Group one table's rows by tag tuple while preserving original row 
membership."""
+        tag_values_by_column = {column: np.concatenate(tag_arrays[column]) for 
column in tag_columns}
+
+        n = len(timestamps)
+        arrays = [tag_values_by_column[col] for col in tag_columns]
+        dtype = np.dtype([(col, arrays[i].dtype) for i, col in 
enumerate(tag_columns)])
+        composite = np.empty(n, dtype=dtype)
+        for i, col in enumerate(tag_columns):
+            composite[col] = arrays[i]
+
+        _, inverse, counts = np.unique(composite, return_inverse=True, 
return_counts=True)
+        ordered_indices = np.argsort(inverse, kind="stable")
+        group_bounds = np.cumsum(counts)[:-1]
+        for group_indices in np.split(ordered_indices, group_bounds):
+            first = int(group_indices[0])
+            tag_tuple = tuple(_to_python_scalar(composite[col][first]) for col 
in tag_columns)
+            yield tag_tuple, timestamps[group_indices]
+
+    def _add_device(
+        self,
+        table_id: int,
+        tag_values: tuple,
+        timestamps: np.ndarray,
+    ):
+        """Add one device to the catalog."""
+        if len(timestamps) == 0:
+            return
+
+        self._catalog.add_device(table_id, tag_values, timestamps)
+
+    def _resolve_series_path(self, series_path: str) -> Tuple[int, int, int]:
+        return resolve_series_path(self._catalog, series_path)
+
+    def _resolve_series_ref(self, device_id: int, field_idx: int):
+        """Resolve a reader-local ref into the table/device metadata needed by 
read paths."""
+        device_entry = self._catalog.device_entries[device_id]
+        table_entry = self._catalog.table_entries[device_entry.table_id]
+        field_name = table_entry.field_columns[field_idx]
+        return table_entry, device_entry, field_name
+
+    def get_device_info(self, device_id: int) -> dict:
+        device_entry = self._catalog.device_entries[device_id]
+        table_entry = self._catalog.table_entries[device_entry.table_id]
+        return {
+            "table_name": table_entry.table_name,
+            "tag_columns": table_entry.tag_columns,
+            "tag_values": dict(zip(table_entry.tag_columns, 
device_entry.tag_values)),
+            "length": device_entry.length,
+            "min_time": device_entry.min_time,
+            "max_time": device_entry.max_time,
+        }
+
+    def get_device_timestamps(self, device_id: int) -> np.ndarray:
+        return self._catalog.device_entries[device_id].timestamps
+
+    def get_series_info_by_ref(self, device_id: int, field_idx: int) -> dict:
+        table_entry, device_entry, field_name = 
self._resolve_series_ref(device_id, field_idx)
+        return {
+            "length": device_entry.length,
+            "min_time": device_entry.min_time,
+            "max_time": device_entry.max_time,
+            "table_name": table_entry.table_name,
+            "column_name": field_name,
+            "device_id": device_id,
+            "field_idx": field_idx,
+            "tag_columns": table_entry.tag_columns,
+            "tag_values": dict(zip(table_entry.tag_columns, 
device_entry.tag_values)),
+        }
+
+    def get_series_info(self, series_path: str) -> dict:
+        device_id, field_idx = self._resolve_series_path(series_path)[1:]
+        return self.get_series_info_by_ref(device_id, field_idx)
+
+    def get_series_timestamps(self, series_path: str) -> np.ndarray:
+        device_id = self._resolve_series_path(series_path)[1]
+        return self.get_device_timestamps(device_id)
+
+    def read_series_by_ref(self, device_id: int, field_idx: int, start_time: 
int, end_time: int) -> Tuple[np.ndarray, np.ndarray]:
+        table_entry, _, field_name = self._resolve_series_ref(device_id, 
field_idx)
+        timestamps, field_values = 
self.read_device_fields_by_time_range(device_id, [field_idx], start_time, 
end_time)
+        if len(timestamps) == 0:
+            return np.array([], dtype=np.int64), np.array([], dtype=np.float64)
+        return timestamps, field_values[field_name]
+
+    def read_series_by_time_range(self, series_path: str, start_time: int, 
end_time: int) -> Tuple[np.ndarray, np.ndarray]:
+        _, device_id, field_idx = self._resolve_series_path(series_path)
+        return self.read_series_by_ref(device_id, field_idx, start_time, 
end_time)
+
+    def read_device_fields_by_time_range(
+        self, device_id: int, field_indices: List[int], start_time: int, 
end_time: int
+    ) -> Tuple[np.ndarray, Dict[str, np.ndarray]]:
+        """Read one device slice and return the requested field columns keyed 
by field name."""
+        device_entry = self._catalog.device_entries[device_id]
+        table_entry = self._catalog.table_entries[device_entry.table_id]
+        requested_field_columns = [table_entry.field_columns[field_idx] for 
field_idx in field_indices]
+        timestamps, field_values = self._read_arrow(
+            table_entry.table_name,
+            requested_field_columns,
+            table_entry.tag_columns,
+            dict(zip(table_entry.tag_columns, device_entry.tag_values)),
+            start_time,
+            end_time,
+        )
+        return timestamps, field_values
+
+    def _read_arrow(
+        self,
+        table_name: str,
+        field_columns: List[str],
+        tag_columns: Tuple[str, ...],
+        tag_values: Dict[str, object],
+        start_time: int,
+        end_time: int,
+    ) -> Tuple[np.ndarray, Dict[str, np.ndarray]]:
+        """Execute the underlying table query, then apply tag filtering 
client-side."""
+        tag_columns = list(tag_columns)
+        field_columns = list(field_columns)
+        query_columns = tag_columns + field_columns if tag_columns else 
list(field_columns)
+        timestamp_parts = []
+        field_parts = {field_column: [] for field_column in field_columns}
+
+        with self._reader.query_table_batch(
+            table_name,
+            query_columns,
+            start_time=start_time,
+            end_time=end_time,
+            batch_size=65536,
+        ) as result_set:
+            while True:
+                arrow_table = result_set.read_arrow_batch()
+                if arrow_table is None:
+                    break
+
+                if tag_values:
+                    mask = None
+                    for tag_column, tag_value in tag_values.items():
+                        column_mask = pc.equal(arrow_table.column(tag_column), 
tag_value)
+                        mask = column_mask if mask is None else pc.and_(mask, 
column_mask)
+                    arrow_table = arrow_table.filter(mask)
+
+                if arrow_table.num_rows == 0:
+                    continue
+
+                timestamp_parts.append(arrow_table.column("time").to_numpy())
+                for field_column in field_columns:
+                    raw_values = arrow_table.column(field_column).to_numpy()
+                    try:
+                        
field_parts[field_column].append(np.asarray(raw_values, dtype=np.float64))
+                    except (TypeError, ValueError) as e:
+                        raise TypeError(
+                            f"Field column '{field_column}' in table 
'{table_name}' is not numeric-compatible."
+                        ) from e
+
+        if not timestamp_parts:
+            return (
+                np.array([], dtype=np.int64),
+                {field_column: np.array([], dtype=np.float64) for field_column 
in field_columns},
+            )
+
+        return (
+            np.concatenate(timestamp_parts).astype(np.int64),
+            {field_column: np.concatenate(field_parts[field_column]) for 
field_column in field_columns},
+        )
diff --git a/python/tsfile/dataset/timeseries.py 
b/python/tsfile/dataset/timeseries.py
new file mode 100644
index 00000000..35f1614d
--- /dev/null
+++ b/python/tsfile/dataset/timeseries.py
@@ -0,0 +1,155 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""Timeseries handles returned by the dataset package."""
+
+from typing import Callable, List, Optional, Tuple
+
+import numpy as np
+
+from .merge import merge_time_value_parts
+from .formatting import format_aligned_timeseries, format_timestamp
+
+
+class AlignedTimeseries:
+    """Time-aligned multi-series query result with timestamps.
+
+    Returned by ``TsFileDataFrame.loc[...]``. The values matrix is aligned on
+    the union of timestamps from the selected logical series.
+    """
+
+    def __init__(self, timestamps: np.ndarray, values: np.ndarray, 
series_names: List[str]):
+        self.timestamps = timestamps
+        self.values = values
+        self.series_names = series_names
+
+    @property
+    def shape(self):
+        return self.values.shape
+
+    def __len__(self):
+        return len(self.timestamps)
+
+    def __getitem__(self, key):
+        return self.values[key]
+
+    def __repr__(self):
+        return format_aligned_timeseries(self.timestamps, self.values, 
self.series_names, max_rows=20)
+
+    def show(self, max_rows: Optional[int] = None):
+        print(format_aligned_timeseries(self.timestamps, self.values, 
self.series_names, max_rows=max_rows))
+
+
+class Timeseries:
+    """Single logical numeric series with transparent cross-file merging.
+
+    Cross-shard reads follow the dataset merge policy defined in
+    :mod:`tsfile.dataset.merge`: duplicate timestamps across shards are treated
+    as an error rather than being merged implicitly.
+    """
+
+    def __init__(
+        self,
+        name: str,
+        series_refs: list,
+        stats: dict,
+        ensure_open: Callable[[], None],
+        load_timestamps: Callable[[], np.ndarray],
+    ):
+        self._name = name
+        self._series_refs = series_refs
+        self._stats = dict(stats)
+        self._ensure_open = ensure_open
+        self._load_timestamps = load_timestamps
+        self._timestamps = None
+
+    @property
+    def name(self) -> str:
+        return self._name
+
+    @property
+    def timestamps(self) -> np.ndarray:
+        self._ensure_open()
+        if self._timestamps is None:
+            self._timestamps = self._load_timestamps()
+        return self._timestamps
+
+    @property
+    def stats(self) -> dict:
+        return {
+            "start_time": self._stats.get("min_time"),
+            "end_time": self._stats.get("max_time"),
+            "count": self._stats["count"],
+        }
+
+    def __len__(self) -> int:
+        return self._stats["count"]
+
+    def __getitem__(self, key):
+        timestamps = self.timestamps
+        length = len(timestamps)
+
+        if isinstance(key, int):
+            if key < 0:
+                key += length
+            if key < 0 or key >= length:
+                raise IndexError(f"Index {key} out of range [0, {length})")
+            ts = int(timestamps[key])
+            _, values = self._query_time_range(ts, ts)
+            return float(values[0]) if len(values) > 0 else None
+
+        if isinstance(key, slice):
+            requested_ts = timestamps[key]
+            if len(requested_ts) == 0:
+                return np.array([], dtype=np.float64)
+
+            ts_arr, values = self._query_time_range(int(np.min(requested_ts)), 
int(np.max(requested_ts)))
+            result = np.full(len(requested_ts), np.nan)
+            if len(ts_arr) > 0:
+                indices = np.searchsorted(ts_arr, requested_ts)
+                valid = (indices < len(ts_arr)) & (
+                    ts_arr[np.minimum(indices, len(ts_arr) - 1)] == 
requested_ts
+                )
+                result[valid] = values[indices[valid]]
+            return result
+
+        raise TypeError(f"Unsupported key type: {type(key)}")
+
+    def _query_time_range(self, start_time: int, end_time: int) -> 
Tuple[np.ndarray, np.ndarray]:
+        self._ensure_open()
+        time_parts = []
+        value_parts = []
+        for reader, device_id, field_idx in self._series_refs:
+            device_timestamps = reader.get_device_timestamps(device_id)
+            if device_timestamps[-1] < start_time or device_timestamps[0] > 
end_time:
+                continue
+            ts_arr, val_arr = reader.read_series_by_ref(device_id, field_idx, 
start_time, end_time)
+            if len(ts_arr) > 0:
+                time_parts.append(ts_arr)
+                value_parts.append(val_arr)
+        return merge_time_value_parts(time_parts, value_parts)
+
+    def __repr__(self):
+        stats = self.stats
+        if stats["count"] == 0:
+            return f"Timeseries('{self._name}', count=0)"
+        return (
+            f"Timeseries('{self._name}', count={stats['count']}, "
+            f"start={format_timestamp(stats['start_time'])}, "
+            f"end={format_timestamp(stats['end_time'])})"
+        )

Reply via email to