JackieTien97 commented on code in PR #765:
URL: https://github.com/apache/tsfile/pull/765#discussion_r3031772614


##########
python/tsfile/tsfile_dataframe.py:
##########
@@ -0,0 +1,859 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""
+TsFileDataFrame - Lazy-loaded unified view over multiple TsFile files.
+
+Provides array-like and DataFrame-like access to time series data
+stored in TsFile format. Supports:
+1. Pre-training: random index access with array-like slicing
+2. Post-training: time-aligned multi-series queries via .loc
+"""
+
+import os
+import sys
+from collections import defaultdict
+from typing import List, Dict, Union, Optional, Tuple
+from datetime import datetime
+
+import numpy as np
+
+
+def _format_timestamp(ts_ms: int) -> str:
+    """Convert millisecond timestamp to human-readable string."""
+    try:
+        return datetime.fromtimestamp(ts_ms / 1000).strftime('%Y-%m-%d 
%H:%M:%S')
+    except (OSError, ValueError):
+        return str(ts_ms)
+
+
+class AlignedTimeseries:
+    """
+    Time-aligned multi-series query result with timestamps.
+
+    Returned by .loc[...] and df[slice/list]. Supports:
+    - result.timestamps -> np.ndarray of ms timestamps
+    - result.values -> np.ndarray of shape (rows, cols)
+    - result.series_names -> list of series name strings
+    - result[i] / result[i, j] -> index into values
+    - print(result) -> truncated table (20 rows)
+    - result.show() -> full table (no truncation)
+    - result.show(50) -> show up to 50 rows
+    """
+
+    def __init__(self, timestamps: np.ndarray, values: np.ndarray, 
series_names: List[str]):
+        self.timestamps = timestamps
+        self.values = values
+        self.series_names = series_names
+
+    @property
+    def shape(self):
+        return self.values.shape
+
+    def __len__(self):
+        return len(self.timestamps)
+
+    def __getitem__(self, key):
+        return self.values[key]
+
+    def _build_display(self):
+        """Pre-compute string representations for display."""
+        n_rows, n_cols = self.values.shape
+        ts_strs = [_format_timestamp(int(t)) for t in self.timestamps]
+        ts_width = max((len(s) for s in ts_strs), default=0)
+        ts_width = max(ts_width, len('timestamp'))
+
+        col_widths = []
+        val_strs = []
+        for col_idx in range(n_cols):
+            col_name = self.series_names[col_idx] if col_idx < 
len(self.series_names) else f'col_{col_idx}'
+            w = len(col_name)
+            col_vals = []
+            for row_idx in range(n_rows):
+                v = self.values[row_idx, col_idx]
+                s = 'NaN' if np.isnan(v) else f'{v:.2f}'
+                col_vals.append(s)
+                w = max(w, len(s))
+            val_strs.append(col_vals)
+            col_widths.append(w)
+
+        return ts_strs, ts_width, col_widths, val_strs
+
+    def _format_rows(self, ts_strs, ts_width, col_widths, val_strs, max_rows):
+        """Format rows with optional truncation."""
+        n_rows = len(ts_strs)
+        n_cols = len(col_widths)
+
+        header_parts = ['timestamp'.rjust(ts_width)]
+        for col_idx in range(n_cols):
+            col_name = self.series_names[col_idx] if col_idx < 
len(self.series_names) else f'col_{col_idx}'
+            header_parts.append(col_name.rjust(col_widths[col_idx]))
+        lines = ['  '.join(header_parts)]
+
+        if max_rows is None or n_rows <= max_rows:
+            show_rows = list(range(n_rows))
+        else:
+            show_rows = list(range(max_rows))
+
+        for row_idx in show_rows:
+            parts = [ts_strs[row_idx].rjust(ts_width)]
+            for col_idx in range(n_cols):
+                
parts.append(val_strs[col_idx][row_idx].rjust(col_widths[col_idx]))
+            lines.append('  '.join(parts))
+
+        return f"AlignedTimeseries({n_rows} rows, {n_cols} series)\n" + 
'\n'.join(lines)
+
+    def __repr__(self):
+        n_rows, n_cols = self.values.shape
+        if n_rows == 0:
+            return f"AlignedTimeseries(0 rows, {n_cols} series)"
+        ts_strs, ts_width, col_widths, val_strs = self._build_display()
+        return self._format_rows(ts_strs, ts_width, col_widths, val_strs, 
max_rows=20)
+
+    def show(self, max_rows: Optional[int] = None):
+        """Print formatted table with configurable row limit.
+
+        Args:
+            max_rows: Maximum rows to display. None for all rows.
+        """
+        n_rows, n_cols = self.values.shape
+        if n_rows == 0:
+            print(f"AlignedTimeseries(0 rows, {n_cols} series)")
+            return
+        ts_strs, ts_width, col_widths, val_strs = self._build_display()
+        print(self._format_rows(ts_strs, ts_width, col_widths, val_strs, 
max_rows))
+
+
+class Timeseries:
+    """
+    Single time series abstraction.
+
+    Supports row-based slicing (converting to time-range queries internally),
+    stats access, and length queries.
+
+    When a series spans multiple files, data is merged transparently.
+    """
+
+    def __init__(self, name: str, readers_and_infos: list, merged_timestamps: 
np.ndarray):
+        """
+        Args:
+            name: Full series path (e.g., "weather.Beijing.humidity").
+            readers_and_infos: List of (reader, series_info) tuples for this 
series.
+            merged_timestamps: Sorted, deduplicated timestamp array across all 
files.
+        """
+        self._name = name
+        self._readers_and_infos = readers_and_infos
+        self._timestamps = merged_timestamps
+
+    @property
+    def name(self) -> str:
+        return self._name
+
+    @property
+    def timestamps(self) -> np.ndarray:
+        return self._timestamps
+
+    @property
+    def stats(self) -> dict:
+        count = len(self._timestamps)
+        if count == 0:
+            return {'start_time': None, 'end_time': None, 'count': 0}
+        return {
+            'start_time': int(self._timestamps[0]),
+            'end_time': int(self._timestamps[-1]),
+            'count': count,
+        }
+
+    def __len__(self) -> int:
+        return len(self._timestamps)
+
+    def __getitem__(self, key):
+        """Row-based access.
+
+        - series[20] -> single float value
+        - series[20:100] -> np.ndarray of values
+        """
+        length = len(self._timestamps)
+
+        if isinstance(key, int):
+            if key < 0:
+                key = length + key
+            if key < 0 or key >= length:
+                raise IndexError(f"Index {key} out of range [0, {length})")
+            ts = int(self._timestamps[key])
+            _, vals = self._query_time_range(ts, ts)
+            return float(vals[0]) if len(vals) > 0 else None
+
+        elif isinstance(key, slice):
+            start, stop, step = key.indices(length)
+            if start >= stop:
+                return np.array([], dtype=np.float64)
+
+            # Get exact timestamps for the requested rows
+            requested_ts = self._timestamps[start:stop]
+            if len(requested_ts) == 0:
+                return np.array([], dtype=np.float64)
+
+            # Query by time range, then filter to exact timestamps
+            start_ts = int(requested_ts[0])
+            end_ts = int(requested_ts[-1])
+            ts_arr, vals = self._query_time_range(start_ts, end_ts)
+
+            # Vectorized alignment: both ts_arr and requested_ts are sorted
+            result = np.full(len(requested_ts), np.nan)
+            if len(ts_arr) > 0:
+                indices = np.searchsorted(ts_arr, requested_ts)
+                valid = (indices < len(ts_arr)) & (ts_arr[np.minimum(indices, 
len(ts_arr) - 1)] == requested_ts)
+                result[valid] = vals[indices[valid]]
+            vals = result
+
+            if step != 1:
+                vals = vals[::step]
+            return vals
+
+        else:
+            raise TypeError(f"Unsupported key type: {type(key)}")
+
+    def _query_time_range(self, start_time: int, end_time: int) -> 
Tuple[np.ndarray, np.ndarray]:
+        """Query all readers for this series in the given time range, merge 
results."""
+        all_ts = []
+        all_vals = []
+        for reader, info in self._readers_and_infos:
+            # Skip reader if its data doesn't overlap
+            if info['max_time'] < start_time or info['min_time'] > end_time:
+                continue
+            ts_arr, val_arr = reader.read_series_by_time_range(
+                self._name, start_time, end_time
+            )
+            if len(ts_arr) > 0:
+                all_ts.append(ts_arr)
+                all_vals.append(val_arr)
+
+        if not all_ts:
+            return np.array([], dtype=np.int64), np.array([], dtype=np.float64)
+
+        if len(all_ts) == 1:
+            return all_ts[0], all_vals[0]
+
+        # Merge from multiple readers, sort by timestamp, deduplicate
+        merged_ts = np.concatenate(all_ts)
+        merged_vals = np.concatenate(all_vals)
+        sort_idx = np.argsort(merged_ts, kind='mergesort')
+        merged_ts = merged_ts[sort_idx]
+        merged_vals = merged_vals[sort_idx]
+
+        # Deduplicate by timestamp (keep first occurrence)
+        _, unique_idx = np.unique(merged_ts, return_index=True)
+        return merged_ts[unique_idx], merged_vals[unique_idx]
+
+    def __repr__(self):
+        stats = self.stats
+        if stats['count'] == 0:
+            return f"Timeseries('{self._name}', count=0)"
+        return (
+            f"Timeseries('{self._name}', count={stats['count']}, "
+            f"start={_format_timestamp(stats['start_time'])}, "
+            f"end={_format_timestamp(stats['end_time'])})"
+        )
+
+
+class _LocIndexer:
+    """
+    Implements .loc[start_time:end_time, series_list] for time-aligned queries.
+
+    Returns AlignedTimeseries with timestamps, values, and series names.
+    """
+
+    def __init__(self, dataframe: 'TsFileDataFrame'):
+        self._df = dataframe
+
+    def _parse_key(self, key):
+        """Parse key into (start_time, end_time, series_names)."""
+        if not isinstance(key, tuple) or len(key) != 2:
+            raise ValueError(
+                "loc requires exactly 2 arguments: "
+                "tsdf.loc[start_time:end_time, series_list]"
+            )
+
+        time_slice, series_spec = key
+
+        # Parse time range
+        if isinstance(time_slice, slice):
+            start_time = time_slice.start
+            end_time = time_slice.stop
+            if start_time is None:
+                start_time = np.iinfo(np.int64).min
+            if end_time is None:
+                end_time = np.iinfo(np.int64).max
+        elif isinstance(time_slice, (int, np.integer)):
+            start_time = end_time = int(time_slice)
+        else:
+            raise TypeError(f"Time index must be slice or int, got 
{type(time_slice)}")
+
+        # Parse series names
+        if isinstance(series_spec, (str, int)):
+            series_spec = [series_spec]
+
+        series_names = []
+        for s in series_spec:
+            if isinstance(s, (int, np.integer)):
+                idx = int(s)
+                if idx < 0 or idx >= len(self._df._series_list):
+                    raise IndexError(f"Series index {idx} out of range")

Review Comment:
   Negative indices are not normalized (no `idx = length + idx` like other 
`__getitem__` methods in this file). A user passing `df.loc[:, [-1]]` would get 
an IndexError instead of the last series.



##########
python/tsfile/tsfile_dataframe.py:
##########
@@ -0,0 +1,859 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""
+TsFileDataFrame - Lazy-loaded unified view over multiple TsFile files.
+
+Provides array-like and DataFrame-like access to time series data
+stored in TsFile format. Supports:
+1. Pre-training: random index access with array-like slicing
+2. Post-training: time-aligned multi-series queries via .loc
+"""
+
+import os
+import sys
+from collections import defaultdict
+from typing import List, Dict, Union, Optional, Tuple
+from datetime import datetime
+
+import numpy as np
+
+
+def _format_timestamp(ts_ms: int) -> str:
+    """Convert millisecond timestamp to human-readable string."""
+    try:
+        return datetime.fromtimestamp(ts_ms / 1000).strftime('%Y-%m-%d 
%H:%M:%S')
+    except (OSError, ValueError):
+        return str(ts_ms)
+
+
+class AlignedTimeseries:
+    """
+    Time-aligned multi-series query result with timestamps.
+
+    Returned by .loc[...] and df[slice/list]. Supports:
+    - result.timestamps -> np.ndarray of ms timestamps
+    - result.values -> np.ndarray of shape (rows, cols)
+    - result.series_names -> list of series name strings
+    - result[i] / result[i, j] -> index into values
+    - print(result) -> truncated table (20 rows)
+    - result.show() -> full table (no truncation)
+    - result.show(50) -> show up to 50 rows
+    """
+
+    def __init__(self, timestamps: np.ndarray, values: np.ndarray, 
series_names: List[str]):
+        self.timestamps = timestamps
+        self.values = values
+        self.series_names = series_names
+
+    @property
+    def shape(self):
+        return self.values.shape
+
+    def __len__(self):
+        return len(self.timestamps)
+
+    def __getitem__(self, key):
+        return self.values[key]
+
+    def _build_display(self):
+        """Pre-compute string representations for display."""
+        n_rows, n_cols = self.values.shape
+        ts_strs = [_format_timestamp(int(t)) for t in self.timestamps]
+        ts_width = max((len(s) for s in ts_strs), default=0)
+        ts_width = max(ts_width, len('timestamp'))
+
+        col_widths = []
+        val_strs = []
+        for col_idx in range(n_cols):
+            col_name = self.series_names[col_idx] if col_idx < 
len(self.series_names) else f'col_{col_idx}'
+            w = len(col_name)
+            col_vals = []
+            for row_idx in range(n_rows):
+                v = self.values[row_idx, col_idx]
+                s = 'NaN' if np.isnan(v) else f'{v:.2f}'
+                col_vals.append(s)
+                w = max(w, len(s))
+            val_strs.append(col_vals)
+            col_widths.append(w)
+
+        return ts_strs, ts_width, col_widths, val_strs
+
+    def _format_rows(self, ts_strs, ts_width, col_widths, val_strs, max_rows):
+        """Format rows with optional truncation."""
+        n_rows = len(ts_strs)
+        n_cols = len(col_widths)
+
+        header_parts = ['timestamp'.rjust(ts_width)]
+        for col_idx in range(n_cols):
+            col_name = self.series_names[col_idx] if col_idx < 
len(self.series_names) else f'col_{col_idx}'
+            header_parts.append(col_name.rjust(col_widths[col_idx]))
+        lines = ['  '.join(header_parts)]
+
+        if max_rows is None or n_rows <= max_rows:
+            show_rows = list(range(n_rows))
+        else:
+            show_rows = list(range(max_rows))

Review Comment:
   When `max_rows` is exceeded, only the first `max_rows` rows are shown. In 
contrast, `TsFileDataFrame._format_table` shows head + "..." + tail. This 
inconsistency may confuse users expecting similar behavior from both display 
methods.



##########
python/tsfile/tsfile_series_reader.py:
##########
@@ -0,0 +1,440 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""
+High-performance TsFile Series Reader
+
+Optimized for time series data reading using the Arrow columnar API.
+Wraps TsFileReaderPy (Cython) and provides series-level metadata
+discovery, timestamp caching, and batch reads with TAG filtering.
+"""
+
+import os
+import sys
+from typing import List, Dict, Optional, Tuple
+
+import numpy as np
+import pyarrow.compute as pc
+
+from .constants import ColumnCategory
+from .tsfile_reader import TsFileReaderPy
+
+
+class TsFileSeriesReader:
+    """
+    TsFile Series Reader
+
+    Wrapper around the Cython TsFileReaderPy for reading TsFile data
+    at the series level. Supports TAG columns: a time series is uniquely
+    identified by Table + Tag values + Field column, producing series
+    paths like "weather.beijing.humidity".
+    """
+
+    def __init__(self, file_path: str, show_progress: bool = True):
+        if not os.path.exists(file_path):
+            raise FileNotFoundError(f"TsFile not found: {file_path}")
+
+        self.file_path = file_path
+        self.show_progress = show_progress
+
+        try:
+            self._reader = TsFileReaderPy(file_path)
+        except Exception as e:
+            raise ValueError(f"Failed to open TsFile: {e}")
+
+        self.series_paths: List[str] = []
+        self.series_info: Dict[str, dict] = {}
+        self._timestamps_cache: Dict[str, np.ndarray] = {}
+        self._series_data_cache: Dict[str, np.ndarray] = {}
+
+        self._cache_metadata()
+
+    def __del__(self):
+        self.close()
+
+    def close(self):
+        """Close the underlying Cython reader."""
+        if hasattr(self, '_reader'):
+            try:
+                self._reader.close()
+            except Exception:
+                pass
+
+    def _cache_metadata(self):
+        """Cache metadata from the TsFile."""
+        try:
+            self._cache_metadata_table_model()
+        except Exception as e:
+            raise ValueError(
+                f"Failed to read TsFile metadata. "
+                f"Please ensure the TsFile is valid and readable. Error: {e}"
+            )

Review Comment:
   The original traceback is lost. Use `raise ValueError(...) from e` to 
preserve the exception chain for debugging.



##########
python/tsfile/tsfile_dataframe.py:
##########
@@ -0,0 +1,859 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""
+TsFileDataFrame - Lazy-loaded unified view over multiple TsFile files.
+
+Provides array-like and DataFrame-like access to time series data
+stored in TsFile format. Supports:
+1. Pre-training: random index access with array-like slicing
+2. Post-training: time-aligned multi-series queries via .loc
+"""
+
+import os
+import sys
+from collections import defaultdict
+from typing import List, Dict, Union, Optional, Tuple
+from datetime import datetime
+
+import numpy as np
+
+
+def _format_timestamp(ts_ms: int) -> str:
+    """Convert millisecond timestamp to human-readable string."""
+    try:
+        return datetime.fromtimestamp(ts_ms / 1000).strftime('%Y-%m-%d 
%H:%M:%S')
+    except (OSError, ValueError):
+        return str(ts_ms)
+
+
+class AlignedTimeseries:
+    """
+    Time-aligned multi-series query result with timestamps.
+
+    Returned by .loc[...] and df[slice/list]. Supports:
+    - result.timestamps -> np.ndarray of ms timestamps
+    - result.values -> np.ndarray of shape (rows, cols)
+    - result.series_names -> list of series name strings
+    - result[i] / result[i, j] -> index into values
+    - print(result) -> truncated table (20 rows)
+    - result.show() -> full table (no truncation)
+    - result.show(50) -> show up to 50 rows
+    """
+
+    def __init__(self, timestamps: np.ndarray, values: np.ndarray, 
series_names: List[str]):
+        self.timestamps = timestamps
+        self.values = values
+        self.series_names = series_names
+
+    @property
+    def shape(self):
+        return self.values.shape
+
+    def __len__(self):
+        return len(self.timestamps)
+
+    def __getitem__(self, key):
+        return self.values[key]
+
+    def _build_display(self):
+        """Pre-compute string representations for display."""
+        n_rows, n_cols = self.values.shape
+        ts_strs = [_format_timestamp(int(t)) for t in self.timestamps]
+        ts_width = max((len(s) for s in ts_strs), default=0)
+        ts_width = max(ts_width, len('timestamp'))
+
+        col_widths = []
+        val_strs = []
+        for col_idx in range(n_cols):
+            col_name = self.series_names[col_idx] if col_idx < 
len(self.series_names) else f'col_{col_idx}'
+            w = len(col_name)
+            col_vals = []
+            for row_idx in range(n_rows):
+                v = self.values[row_idx, col_idx]
+                s = 'NaN' if np.isnan(v) else f'{v:.2f}'
+                col_vals.append(s)
+                w = max(w, len(s))
+            val_strs.append(col_vals)
+            col_widths.append(w)
+
+        return ts_strs, ts_width, col_widths, val_strs
+
+    def _format_rows(self, ts_strs, ts_width, col_widths, val_strs, max_rows):
+        """Format rows with optional truncation."""
+        n_rows = len(ts_strs)
+        n_cols = len(col_widths)
+
+        header_parts = ['timestamp'.rjust(ts_width)]
+        for col_idx in range(n_cols):
+            col_name = self.series_names[col_idx] if col_idx < 
len(self.series_names) else f'col_{col_idx}'
+            header_parts.append(col_name.rjust(col_widths[col_idx]))
+        lines = ['  '.join(header_parts)]
+
+        if max_rows is None or n_rows <= max_rows:
+            show_rows = list(range(n_rows))
+        else:
+            show_rows = list(range(max_rows))
+
+        for row_idx in show_rows:
+            parts = [ts_strs[row_idx].rjust(ts_width)]
+            for col_idx in range(n_cols):
+                
parts.append(val_strs[col_idx][row_idx].rjust(col_widths[col_idx]))
+            lines.append('  '.join(parts))
+
+        return f"AlignedTimeseries({n_rows} rows, {n_cols} series)\n" + 
'\n'.join(lines)
+
+    def __repr__(self):
+        n_rows, n_cols = self.values.shape
+        if n_rows == 0:
+            return f"AlignedTimeseries(0 rows, {n_cols} series)"
+        ts_strs, ts_width, col_widths, val_strs = self._build_display()
+        return self._format_rows(ts_strs, ts_width, col_widths, val_strs, 
max_rows=20)
+
+    def show(self, max_rows: Optional[int] = None):
+        """Print formatted table with configurable row limit.
+
+        Args:
+            max_rows: Maximum rows to display. None for all rows.
+        """
+        n_rows, n_cols = self.values.shape
+        if n_rows == 0:
+            print(f"AlignedTimeseries(0 rows, {n_cols} series)")
+            return
+        ts_strs, ts_width, col_widths, val_strs = self._build_display()
+        print(self._format_rows(ts_strs, ts_width, col_widths, val_strs, 
max_rows))
+
+
+class Timeseries:
+    """
+    Single time series abstraction.
+
+    Supports row-based slicing (converting to time-range queries internally),
+    stats access, and length queries.
+
+    When a series spans multiple files, data is merged transparently.
+    """
+
+    def __init__(self, name: str, readers_and_infos: list, merged_timestamps: 
np.ndarray):
+        """
+        Args:
+            name: Full series path (e.g., "weather.Beijing.humidity").
+            readers_and_infos: List of (reader, series_info) tuples for this 
series.
+            merged_timestamps: Sorted, deduplicated timestamp array across all 
files.
+        """
+        self._name = name
+        self._readers_and_infos = readers_and_infos
+        self._timestamps = merged_timestamps
+
+    @property
+    def name(self) -> str:
+        return self._name
+
+    @property
+    def timestamps(self) -> np.ndarray:
+        return self._timestamps
+
+    @property
+    def stats(self) -> dict:
+        count = len(self._timestamps)
+        if count == 0:
+            return {'start_time': None, 'end_time': None, 'count': 0}
+        return {
+            'start_time': int(self._timestamps[0]),
+            'end_time': int(self._timestamps[-1]),
+            'count': count,
+        }
+
+    def __len__(self) -> int:
+        return len(self._timestamps)
+
+    def __getitem__(self, key):
+        """Row-based access.
+
+        - series[20] -> single float value
+        - series[20:100] -> np.ndarray of values
+        """
+        length = len(self._timestamps)
+
+        if isinstance(key, int):
+            if key < 0:
+                key = length + key
+            if key < 0 or key >= length:
+                raise IndexError(f"Index {key} out of range [0, {length})")
+            ts = int(self._timestamps[key])
+            _, vals = self._query_time_range(ts, ts)
+            return float(vals[0]) if len(vals) > 0 else None
+
+        elif isinstance(key, slice):
+            start, stop, step = key.indices(length)
+            if start >= stop:
+                return np.array([], dtype=np.float64)
+
+            # Get exact timestamps for the requested rows
+            requested_ts = self._timestamps[start:stop]
+            if len(requested_ts) == 0:
+                return np.array([], dtype=np.float64)
+
+            # Query by time range, then filter to exact timestamps
+            start_ts = int(requested_ts[0])
+            end_ts = int(requested_ts[-1])
+            ts_arr, vals = self._query_time_range(start_ts, end_ts)
+
+            # Vectorized alignment: both ts_arr and requested_ts are sorted
+            result = np.full(len(requested_ts), np.nan)
+            if len(ts_arr) > 0:
+                indices = np.searchsorted(ts_arr, requested_ts)
+                valid = (indices < len(ts_arr)) & (ts_arr[np.minimum(indices, 
len(ts_arr) - 1)] == requested_ts)
+                result[valid] = vals[indices[valid]]
+            vals = result
+
+            if step != 1:
+                vals = vals[::step]
+            return vals
+
+        else:
+            raise TypeError(f"Unsupported key type: {type(key)}")
+
+    def _query_time_range(self, start_time: int, end_time: int) -> 
Tuple[np.ndarray, np.ndarray]:
+        """Query all readers for this series in the given time range, merge 
results."""
+        all_ts = []
+        all_vals = []
+        for reader, info in self._readers_and_infos:
+            # Skip reader if its data doesn't overlap
+            if info['max_time'] < start_time or info['min_time'] > end_time:
+                continue
+            ts_arr, val_arr = reader.read_series_by_time_range(
+                self._name, start_time, end_time
+            )
+            if len(ts_arr) > 0:
+                all_ts.append(ts_arr)
+                all_vals.append(val_arr)
+
+        if not all_ts:
+            return np.array([], dtype=np.int64), np.array([], dtype=np.float64)
+
+        if len(all_ts) == 1:
+            return all_ts[0], all_vals[0]
+
+        # Merge from multiple readers, sort by timestamp, deduplicate
+        merged_ts = np.concatenate(all_ts)
+        merged_vals = np.concatenate(all_vals)
+        sort_idx = np.argsort(merged_ts, kind='mergesort')
+        merged_ts = merged_ts[sort_idx]
+        merged_vals = merged_vals[sort_idx]
+
+        # Deduplicate by timestamp (keep first occurrence)
+        _, unique_idx = np.unique(merged_ts, return_index=True)
+        return merged_ts[unique_idx], merged_vals[unique_idx]
+
+    def __repr__(self):
+        stats = self.stats
+        if stats['count'] == 0:
+            return f"Timeseries('{self._name}', count=0)"
+        return (
+            f"Timeseries('{self._name}', count={stats['count']}, "
+            f"start={_format_timestamp(stats['start_time'])}, "
+            f"end={_format_timestamp(stats['end_time'])})"
+        )
+
+
+class _LocIndexer:
+    """
+    Implements .loc[start_time:end_time, series_list] for time-aligned queries.
+
+    Returns AlignedTimeseries with timestamps, values, and series names.
+    """
+
+    def __init__(self, dataframe: 'TsFileDataFrame'):
+        self._df = dataframe
+
+    def _parse_key(self, key):
+        """Parse key into (start_time, end_time, series_names)."""
+        if not isinstance(key, tuple) or len(key) != 2:
+            raise ValueError(
+                "loc requires exactly 2 arguments: "
+                "tsdf.loc[start_time:end_time, series_list]"
+            )
+
+        time_slice, series_spec = key
+
+        # Parse time range
+        if isinstance(time_slice, slice):
+            start_time = time_slice.start
+            end_time = time_slice.stop
+            if start_time is None:
+                start_time = np.iinfo(np.int64).min
+            if end_time is None:
+                end_time = np.iinfo(np.int64).max
+        elif isinstance(time_slice, (int, np.integer)):
+            start_time = end_time = int(time_slice)
+        else:
+            raise TypeError(f"Time index must be slice or int, got 
{type(time_slice)}")
+
+        # Parse series names
+        if isinstance(series_spec, (str, int)):
+            series_spec = [series_spec]
+
+        series_names = []
+        for s in series_spec:
+            if isinstance(s, (int, np.integer)):
+                idx = int(s)
+                if idx < 0 or idx >= len(self._df._series_list):
+                    raise IndexError(f"Series index {idx} out of range")
+                series_names.append(self._df._series_list[idx])
+            elif isinstance(s, str):
+                if s not in self._df._series_map:
+                    raise KeyError(f"Series not found: {s}")
+                series_names.append(s)
+            else:
+                raise TypeError(f"Series specifier must be int or str, got 
{type(s)}")
+
+        return start_time, end_time, series_names
+
+    def _query_aligned(self, start_time: int, end_time: int, series_names: 
List[str]):
+        """Query and align multiple series using batched Arrow reads."""
+        # Group series by (reader_id, table_name, tag_tuple) for batch queries.
+        # Each group can be fetched with a single query_table_batch call.
+        groups = defaultdict(list)  # key -> [(col_idx, field_name, 
series_name, reader, info)]
+
+        for col_idx, name in enumerate(series_names):
+            entries = self._df._series_map[name]
+            for reader, info in entries:
+                if info['max_time'] < start_time or info['min_time'] > 
end_time:
+                    continue
+                key = (id(reader), info['table_name'],
+                       tuple(sorted(info['tag_values'].items())))
+                groups[key].append((col_idx, info['column_name'], name, 
reader, info))
+
+        # Fetch data: one query per group
+        series_data = {}  # series_name -> (ts_arr, val_arr)
+
+        for key, entries in groups.items():
+            reader = entries[0][3]
+            info = entries[0][4]
+            field_columns = list(dict.fromkeys(e[1] for e in entries))  # 
dedupe, keep order
+
+            ts_arr, field_vals = reader.read_multi_series_by_time_range(
+                info['table_name'], field_columns,
+                info['tag_columns'], info['tag_values'],
+                start_time, end_time,
+            )
+
+            for _col_idx, field_name, name, _, _ in entries:
+                if name in series_data:
+                    # Series spans multiple readers: merge
+                    prev_ts, prev_val = series_data[name]
+                    merged_ts = np.concatenate([prev_ts, ts_arr])
+                    merged_val = np.concatenate([prev_val, 
field_vals[field_name]])
+                    sort_idx = np.argsort(merged_ts, kind='mergesort')
+                    merged_ts = merged_ts[sort_idx]
+                    merged_val = merged_val[sort_idx]
+                    _, unique_idx = np.unique(merged_ts, return_index=True)
+                    series_data[name] = (merged_ts[unique_idx], 
merged_val[unique_idx])
+                else:
+                    series_data[name] = (ts_arr, field_vals[field_name])
+
+        # Collect unique timestamps using numpy
+        all_ts_arrays = [data[0] for data in series_data.values() if 
len(data[0]) > 0]
+        if not all_ts_arrays:
+            return (np.array([], dtype=np.int64),
+                    np.array([]).reshape(0, len(series_names)))
+
+        sorted_timestamps = np.unique(np.concatenate(all_ts_arrays))
+
+        # Build result matrix using np.searchsorted for vectorized alignment
+        result = np.full((len(sorted_timestamps), len(series_names)), np.nan)
+
+        for col_idx, name in enumerate(series_names):
+            if name not in series_data:
+                continue
+            ts_arr, val_arr = series_data[name]
+            if len(ts_arr) == 0:
+                continue
+            indices = np.searchsorted(sorted_timestamps, ts_arr)
+            result[indices, col_idx] = val_arr
+
+        return sorted_timestamps, result
+
+    def __getitem__(self, key) -> 'AlignedTimeseries':
+        start_time, end_time, series_names = self._parse_key(key)
+        timestamps, values = self._query_aligned(start_time, end_time, 
series_names)
+        return AlignedTimeseries(timestamps, values, series_names)
+
+
+class TsFileDataFrame:
+    """
+    Lazy-loaded unified view over multiple TsFile files.
+
+    Each dimension is a time series identified by Table + Tags + Field.
+    Supports cross-file merging of same-named series.
+    """
+
+    def __init__(self, paths: Union[str, List[str]], show_progress: bool = 
True):
+        if isinstance(paths, str):
+            paths = [paths]
+
+        # Expand directories: collect all .tsfile files within each directory
+        expanded = []
+        for p in paths:
+            if os.path.isdir(p):
+                tsfiles = sorted(
+                    os.path.join(root, f)
+                    for root, _, files in os.walk(p)
+                    for f in files
+                    if f.endswith('.tsfile')
+                )
+                if not tsfiles:
+                    raise FileNotFoundError(
+                        f"No .tsfile files found in directory: {p}"
+                    )
+                expanded.extend(tsfiles)
+            else:
+                expanded.append(p)
+
+        self._paths = []
+        for p in expanded:
+            if not os.path.exists(p):
+                raise FileNotFoundError(f"TsFile not found: {p}")
+            self._paths.append(os.path.abspath(p))
+
+        self._show_progress = show_progress
+        self._readers = {}
+        self._series_list: List[str] = []
+        self._series_map: Dict[str, list] = {}  # series_path -> [(reader, 
info), ...]
+        self._merged_timestamps: Dict[str, np.ndarray] = {}
+        self._merged_info: Dict[str, dict] = {}
+        self._name_to_index: Dict[str, int] = {}
+
+        self._is_view = False
+        self._root = None
+
+        self._load_metadata()
+
+    @classmethod
+    def _from_subset(cls, parent: 'TsFileDataFrame', series_names: List[str]) 
-> 'TsFileDataFrame':
+        """Create a lightweight view over a subset of series.
+
+        Shares readers, series_map, merged_timestamps, and merged_info
+        with the root. Does NOT own any readers and will not close them.
+        """
+        obj = object.__new__(cls)
+        obj._root = parent._root if parent._is_view else parent
+        obj._is_view = True
+        obj._paths = parent._paths
+        obj._readers = parent._readers
+        obj._series_map = parent._series_map
+        obj._merged_timestamps = parent._merged_timestamps
+        obj._merged_info = parent._merged_info
+        obj._show_progress = parent._show_progress
+        obj._series_list = list(series_names)
+        obj._name_to_index = {name: i for i, name in enumerate(series_names)}
+        return obj
+
+    def _load_metadata(self):
+        """Load metadata from all TsFile files."""
+        from .tsfile_series_reader import TsFileSeriesReader
+
+        if len(self._paths) >= 2:
+            self._load_metadata_parallel(TsFileSeriesReader)
+        else:
+            self._load_metadata_serial(TsFileSeriesReader)
+
+        # Build ordered series list and merged metadata
+        seen = set()
+        for file_path in self._paths:
+            reader = self._readers[file_path]
+            for series_path in reader.series_paths:
+                if series_path not in seen:
+                    seen.add(series_path)
+                    self._series_list.append(series_path)
+                    self._name_to_index[series_path] = len(self._series_list) 
- 1
+                    self._build_merged_info(series_path)
+
+        if not self._series_list:
+            raise ValueError("No valid time series found in the provided 
TsFile files")
+
+    def _load_metadata_serial(self, ReaderClass):
+        """Load metadata from files sequentially."""
+        for file_path in self._paths:
+            reader = ReaderClass(file_path, show_progress=self._show_progress)
+            self._register_reader(file_path, reader)
+
+    def _load_metadata_parallel(self, ReaderClass):
+        """Load metadata from files in parallel using threads."""
+        from concurrent.futures import ThreadPoolExecutor, as_completed
+
+        def _open_file(file_path):
+            return file_path, ReaderClass(file_path, show_progress=False)
+
+        num_workers = min(len(self._paths), os.cpu_count() or 4)
+        total = len(self._paths)
+        with ThreadPoolExecutor(max_workers=num_workers) as executor:
+            futures = {executor.submit(_open_file, p): p for p in self._paths}
+            results = {}
+            done_count = 0
+            for future in as_completed(futures):
+                file_path, reader = future.result()
+                results[file_path] = reader
+                done_count += 1
+                if self._show_progress:
+                    sys.stderr.write(
+                        f"\rLoading TsFile shards: "
+                        f"{done_count}/{total}"
+                    )
+                    sys.stderr.flush()
+
+        if self._show_progress and total > 0:
+            total_series = sum(len(r.series_paths) for r in results.values())
+            sys.stderr.write(
+                f"\rLoading TsFile shards: "
+                f"{total}/{total} "
+                f"({total_series} series) "
+                f"... done\n"
+            )
+            sys.stderr.flush()
+
+        # Register in original path order to keep series ordering deterministic
+        for file_path in self._paths:
+            self._register_reader(file_path, results[file_path])
+
+    def _register_reader(self, file_path: str, reader):
+        """Register a reader and index its series."""
+        self._readers[file_path] = reader
+        for series_path in reader.series_paths:
+            if series_path not in self._series_map:
+                self._series_map[series_path] = []
+            self._series_map[series_path].append(
+                (reader, reader.series_info[series_path])
+            )
+
+    def _build_merged_info(self, series_path: str):
+        """Merge timestamps and metadata from all readers for a given 
series."""
+        entries = self._series_map[series_path]
+        # Use first entry for structural metadata (table/tag/field are 
identical across files)
+        _, first_info = entries[0]
+
+        if len(entries) == 1:
+            reader, info = entries[0]
+            self._merged_timestamps[series_path] = 
reader._timestamps_cache[series_path]
+            self._merged_info[series_path] = {
+                'table_name': info['table_name'],
+                'tag_columns': info['tag_columns'],
+                'tag_values': info['tag_values'],
+                'field': info['column_name'],
+                'min_time': info['min_time'],
+                'max_time': info['max_time'],
+                'count': info['length'],
+            }
+        else:
+            all_ts = []
+            for reader, info in entries:
+                all_ts.append(reader._timestamps_cache[series_path])
+            merged = np.unique(np.concatenate(all_ts))
+            merged.sort()

Review Comment:
   `np.unique` already returns a sorted array. The `.sort()` call is redundant.



##########
python/tsfile/__init__.py:
##########
@@ -40,4 +40,5 @@
 from .tsfile_writer import TsFileWriterPy as TsFileWriter
 from .tsfile_py_cpp import get_tsfile_config, set_tsfile_config
 from .tsfile_table_writer import TsFileTableWriter
-from .utils import to_dataframe, dataframe_to_tsfile
\ No newline at end of file
+from .utils import to_dataframe, dataframe_to_tsfile
+from .tsfile_dataframe import TsFileDataFrame, Timeseries, AlignedTimeseries

Review Comment:
   The file ends without a newline character (`\ No newline at end of file`).



##########
python/tsfile/tsfile_series_reader.py:
##########
@@ -0,0 +1,440 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""
+High-performance TsFile Series Reader
+
+Optimized for time series data reading using the Arrow columnar API.
+Wraps TsFileReaderPy (Cython) and provides series-level metadata
+discovery, timestamp caching, and batch reads with TAG filtering.
+"""
+
+import os
+import sys
+from typing import List, Dict, Optional, Tuple
+
+import numpy as np
+import pyarrow.compute as pc
+
+from .constants import ColumnCategory
+from .tsfile_reader import TsFileReaderPy
+
+
+class TsFileSeriesReader:
+    """
+    TsFile Series Reader
+
+    Wrapper around the Cython TsFileReaderPy for reading TsFile data
+    at the series level. Supports TAG columns: a time series is uniquely
+    identified by Table + Tag values + Field column, producing series
+    paths like "weather.beijing.humidity".
+    """
+
+    def __init__(self, file_path: str, show_progress: bool = True):
+        if not os.path.exists(file_path):
+            raise FileNotFoundError(f"TsFile not found: {file_path}")
+
+        self.file_path = file_path
+        self.show_progress = show_progress
+
+        try:
+            self._reader = TsFileReaderPy(file_path)
+        except Exception as e:
+            raise ValueError(f"Failed to open TsFile: {e}")
+
+        self.series_paths: List[str] = []
+        self.series_info: Dict[str, dict] = {}
+        self._timestamps_cache: Dict[str, np.ndarray] = {}
+        self._series_data_cache: Dict[str, np.ndarray] = {}
+
+        self._cache_metadata()
+
+    def __del__(self):
+        self.close()
+
+    def close(self):
+        """Close the underlying Cython reader."""
+        if hasattr(self, '_reader'):
+            try:
+                self._reader.close()
+            except Exception:
+                pass
+
+    def _cache_metadata(self):
+        """Cache metadata from the TsFile."""
+        try:
+            self._cache_metadata_table_model()
+        except Exception as e:
+            raise ValueError(
+                f"Failed to read TsFile metadata. "
+                f"Please ensure the TsFile is valid and readable. Error: {e}"
+            )
+
+    def _cache_metadata_table_model(self):
+        """
+        Cache metadata using table model query via Arrow batch API.
+
+        Unified logic for tables with or without TAG columns.
+        """
+        table_schemas = self._reader.get_all_table_schemas()
+        if not table_schemas:
+            raise ValueError("No tables found in TsFile")
+
+        self.series_paths = []
+        table_names = list(table_schemas.keys())
+
+        # Progress tracking
+        total_rows = 0
+
+        for ti, table_name in enumerate(table_names):
+            table_schema = self._reader.get_table_schema(table_name)
+
+            tag_columns = []
+            field_columns = []
+            for col_schema in table_schema.get_columns():
+                col_name = col_schema.get_column_name()
+                col_category = col_schema.get_category()
+                if col_name.lower() == 'time':
+                    continue
+                if col_category == ColumnCategory.TAG:
+                    tag_columns.append(col_name)
+                elif col_category == ColumnCategory.FIELD:
+                    field_columns.append(col_name)
+
+            if not field_columns:
+                continue
+
+            # Query TAG columns + first FIELD column to discover groups and 
timestamps
+            query_cols = tag_columns + [field_columns[0]]
+
+            time_arrays = []
+            tag_arrays = {tc: [] for tc in tag_columns}
+
+            with self._reader.query_table_batch(
+                table_name, query_cols, batch_size=65536
+            ) as rs:
+                while True:
+                    arrow_table = rs.read_arrow_batch()
+                    if arrow_table is None:
+                        break
+                    batch_rows = arrow_table.num_rows
+                    total_rows += batch_rows
+                    time_arrays.append(arrow_table.column('time').to_numpy())
+                    for tc in tag_columns:
+                        
tag_arrays[tc].append(arrow_table.column(tc).to_numpy())
+
+                    if self.show_progress:
+                        sys.stderr.write(
+                            f"\rReading TsFile metadata: "
+                            f"table {ti + 1}/{len(table_names)} "
+                            f"[{table_name}] "
+                            f"({total_rows:,} rows)"
+                        )
+                        sys.stderr.flush()
+
+            if not time_arrays:
+                continue
+
+            all_times = np.concatenate(time_arrays).astype(np.int64)
+
+            if tag_columns:
+                # Merge tag columns and group by unique tag combinations
+                all_tags = {tc: np.concatenate(tag_arrays[tc]) for tc in 
tag_columns}
+
+                # Build a composite key for grouping
+                if len(tag_columns) == 1:
+                    tag_key = all_tags[tag_columns[0]]
+                    unique_keys = np.unique(tag_key)
+                    for uk in unique_keys:
+                        mask = tag_key == uk
+                        tag_values = (uk,) if not isinstance(uk, tuple) else uk
+                        self._register_tag_group(
+                            table_name, tag_columns, tag_values,
+                            field_columns, all_times[mask]
+                        )
+                else:
+                    # Multiple tag columns: use structured approach
+                    # Convert to list of tuples for grouping
+                    n = len(all_times)
+                    tag_tuples = [
+                        tuple(all_tags[tc][i] for tc in tag_columns)
+                        for i in range(n)
+                    ]
+                    unique_tuples = list(dict.fromkeys(tag_tuples))
+                    for ut in unique_tuples:
+                        mask = np.array([t == ut for t in tag_tuples], 
dtype=bool)
+                        self._register_tag_group(
+                            table_name, tag_columns, ut,
+                            field_columns, all_times[mask]
+                        )
+            else:
+                # No TAG columns: single group
+                self._register_tag_group(
+                    table_name, tag_columns, (),
+                    field_columns, all_times
+                )
+
+        if self.show_progress and total_rows > 0:
+            sys.stderr.write(
+                f"\rReading TsFile metadata: "
+                f"{len(table_names)} table(s), "
+                f"{total_rows:,} rows, "
+                f"{len(self.series_paths)} series "
+                f"... done\n"
+            )
+            sys.stderr.flush()
+
+        if not self.series_paths:
+            raise ValueError("No valid numeric series found in TsFile")
+
+    def _register_tag_group(
+        self, table_name: str, tag_columns: List[str],
+        tag_values: tuple, field_columns: List[str], timestamps: np.ndarray
+    ):
+        """Register all field series for a given table + tag group."""
+        timestamps = np.sort(timestamps)
+
+        if len(timestamps) == 0:
+            return
+
+        if tag_columns:
+            tag_part = ".".join(str(v) for v in tag_values)
+        else:
+            tag_part = ""
+
+        tag_values_dict = dict(zip(tag_columns, tag_values)) if tag_columns 
else {}
+
+        for field_col in field_columns:
+            if tag_part:
+                series_path = f"{table_name}.{tag_part}.{field_col}"
+            else:
+                series_path = f"{table_name}.{field_col}"
+
+            self.series_paths.append(series_path)
+            self._timestamps_cache[series_path] = timestamps
+            self.series_info[series_path] = {
+                'length': len(timestamps),
+                'min_time': int(timestamps[0]),
+                'max_time': int(timestamps[-1]),
+                'table_name': table_name,
+                'column_name': field_col,
+                'tag_columns': tag_columns,
+                'tag_values': tag_values_dict,
+            }
+
+    def get_all_series(self) -> List[str]:
+        """Return a list of all discovered series paths."""
+        return self.series_paths.copy()
+
+    def get_series_length(self, series_path: str) -> int:
+        """Return the number of data points for a series."""
+        if series_path not in self.series_info:
+            raise ValueError(f"Series not found: {series_path}")
+        return self.series_info[series_path]['length']
+
+    def read_series(self, series_path: str) -> List[float]:
+        """Read all data points for a series.
+
+        Args:
+            series_path: Time series path.
+
+        Returns:
+            List of data points.
+        """
+        if series_path not in self.series_info:
+            raise ValueError(f"Series not found: {series_path}")
+        if series_path in self._series_data_cache:
+            return self._series_data_cache[series_path].tolist()
+        length = self.series_info[series_path]['length']
+        return self.read_series_range(series_path, 0, length)
+
+    def read_series_range(self, series_path: str, start: int, end: int) -> 
List[float]:
+        """Read specified range of time series by row index.
+
+        Args:
+            series_path: Time series path.
+            start: Start index (inclusive).
+            end: End index (exclusive).
+
+        Returns:
+            List of data points.
+        """
+        if series_path not in self.series_info:
+            raise ValueError(f"Series not found: {series_path}")
+
+        if series_path in self._series_data_cache:
+            return self._series_data_cache[series_path][start:end].tolist()
+
+        info = self.series_info[series_path]
+        timestamps = self._timestamps_cache[series_path]
+
+        start_time = int(timestamps[start])
+        end_time = int(timestamps[end - 1])
+
+        _, vals = self._read_arrow(
+            info['table_name'],
+            [info['column_name']],
+            info['tag_columns'],
+            info['tag_values'],
+            start_time, end_time,
+        )
+        return vals[info['column_name']].tolist()
+
+    def read_series_by_time_range(
+        self, series_path: str, start_time: int, end_time: int
+    ) -> Tuple[np.ndarray, np.ndarray]:
+        """Read data by time range directly (for loc-style queries).
+
+        Args:
+            series_path: Time series path.
+            start_time: Start timestamp (inclusive, ms).
+            end_time: End timestamp (inclusive, ms).
+
+        Returns:
+            Tuple of (timestamps_array, values_array).
+        """
+        if series_path not in self.series_info:
+            raise ValueError(f"Series not found: {series_path}")
+
+        info = self.series_info[series_path]
+        ts_arr, field_vals = self._read_arrow(
+            info['table_name'],
+            [info['column_name']],
+            info['tag_columns'],
+            info['tag_values'],
+            start_time, end_time,
+        )
+        if len(ts_arr) > 0:
+            return ts_arr, field_vals[info['column_name']]
+        return np.array([], dtype=np.int64), np.array([], dtype=np.float64)
+
+    def read_multi_series_by_time_range(
+        self,
+        table_name: str,
+        field_columns: List[str],
+        tag_columns: List[str],
+        tag_values: Dict[str, str],
+        start_time: int,
+        end_time: int,
+    ) -> Tuple[np.ndarray, Dict[str, np.ndarray]]:
+        """Read multiple field columns from the same table+tag group in one 
query.
+
+        Args:
+            table_name: TsFile table name.
+            field_columns: List of field column names to read.
+            tag_columns: List of tag column names.
+            tag_values: Dict of tag column name to tag value.
+            start_time: Start timestamp (inclusive, ms).
+            end_time: End timestamp (inclusive, ms).
+
+        Returns:
+            (timestamps_array, {field_name: values_array}).
+        """
+        return self._read_arrow(
+            table_name, field_columns, tag_columns, tag_values,
+            start_time, end_time,
+        )
+
+    def _read_arrow(
+        self,
+        table_name: str,
+        field_columns: List[str],
+        tag_columns: List[str],
+        tag_values: Dict[str, str],
+        start_time: int,
+        end_time: int,
+    ) -> Tuple[np.ndarray, Dict[str, np.ndarray]]:
+        """Core Arrow batch reader.
+
+        Read one or more field columns from a single table+tag group
+        via query_table_batch + read_arrow_batch.
+
+        Args:
+            table_name: TsFile table name.
+            field_columns: Field columns to read.
+            tag_columns: Tag column names (for query).
+            tag_values: Tag filter values.
+            start_time: Start timestamp (inclusive, ms).
+            end_time: End timestamp (inclusive, ms).
+
+        Returns:
+            (timestamps_array, {field_name: values_array}).
+        """
+        if tag_columns:
+            query_cols = tag_columns + field_columns
+        else:
+            query_cols = list(field_columns)
+
+        ts_list = []
+        field_lists = {fc: [] for fc in field_columns}
+
+        with self._reader.query_table_batch(
+            table_name, query_cols,
+            start_time=start_time, end_time=end_time, batch_size=65536
+        ) as rs:
+            while True:
+                arrow_table = rs.read_arrow_batch()
+                if arrow_table is None:
+                    break
+
+                if tag_values:
+                    mask = None
+                    for tag_col, tag_val in tag_values.items():
+                        col_mask = pc.equal(arrow_table.column(tag_col), 
tag_val)
+                        mask = col_mask if mask is None else pc.and_(mask, 
col_mask)
+                    arrow_table = arrow_table.filter(mask)
+
+                if arrow_table.num_rows > 0:
+                    ts_list.append(arrow_table.column('time').to_numpy())
+                    for fc in field_columns:
+                        field_lists[fc].append(
+                            
arrow_table.column(fc).to_numpy().astype(np.float64)
+                        )
+
+        if ts_list:
+            return (
+                np.concatenate(ts_list).astype(np.int64),
+                {fc: np.concatenate(field_lists[fc]) for fc in field_columns},
+            )
+        return (
+            np.array([], dtype=np.int64),
+            {fc: np.array([], dtype=np.float64) for fc in field_columns},
+        )
+
+    def cache_series_data(self, series_path: str):
+        """Pre-load series data into memory cache.
+
+        Args:
+            series_path: Time series path.
+        """
+        if series_path not in self.series_info:
+            raise ValueError(f"Series not found: {series_path}")
+        if series_path not in self._series_data_cache:
+            data = self.read_series(series_path)
+            self._series_data_cache[series_path] = np.array(data, 
dtype=np.float32)

Review Comment:
   `read_series` returns float64 values (via `_read_arrow` which casts to 
`np.float64`), but `cache_series_data` stores them as float32. When 
`read_series` is later called on a cached series, it returns float32 values 
silently, which can lose precision.



##########
python/tsfile/tsfile_series_reader.py:
##########
@@ -0,0 +1,440 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""
+High-performance TsFile Series Reader
+
+Optimized for time series data reading using the Arrow columnar API.
+Wraps TsFileReaderPy (Cython) and provides series-level metadata
+discovery, timestamp caching, and batch reads with TAG filtering.
+"""
+
+import os
+import sys
+from typing import List, Dict, Optional, Tuple
+
+import numpy as np
+import pyarrow.compute as pc
+
+from .constants import ColumnCategory
+from .tsfile_reader import TsFileReaderPy
+
+
+class TsFileSeriesReader:
+    """
+    TsFile Series Reader
+
+    Wrapper around the Cython TsFileReaderPy for reading TsFile data
+    at the series level. Supports TAG columns: a time series is uniquely
+    identified by Table + Tag values + Field column, producing series
+    paths like "weather.beijing.humidity".
+    """
+
+    def __init__(self, file_path: str, show_progress: bool = True):
+        if not os.path.exists(file_path):
+            raise FileNotFoundError(f"TsFile not found: {file_path}")
+
+        self.file_path = file_path
+        self.show_progress = show_progress
+
+        try:
+            self._reader = TsFileReaderPy(file_path)
+        except Exception as e:
+            raise ValueError(f"Failed to open TsFile: {e}")
+
+        self.series_paths: List[str] = []
+        self.series_info: Dict[str, dict] = {}
+        self._timestamps_cache: Dict[str, np.ndarray] = {}
+        self._series_data_cache: Dict[str, np.ndarray] = {}
+
+        self._cache_metadata()
+
+    def __del__(self):
+        self.close()
+
+    def close(self):
+        """Close the underlying Cython reader."""
+        if hasattr(self, '_reader'):
+            try:
+                self._reader.close()
+            except Exception:
+                pass
+
+    def _cache_metadata(self):
+        """Cache metadata from the TsFile."""
+        try:
+            self._cache_metadata_table_model()
+        except Exception as e:
+            raise ValueError(
+                f"Failed to read TsFile metadata. "
+                f"Please ensure the TsFile is valid and readable. Error: {e}"
+            )
+
+    def _cache_metadata_table_model(self):
+        """
+        Cache metadata using table model query via Arrow batch API.
+
+        Unified logic for tables with or without TAG columns.
+        """
+        table_schemas = self._reader.get_all_table_schemas()
+        if not table_schemas:
+            raise ValueError("No tables found in TsFile")
+
+        self.series_paths = []
+        table_names = list(table_schemas.keys())
+
+        # Progress tracking
+        total_rows = 0
+
+        for ti, table_name in enumerate(table_names):
+            table_schema = self._reader.get_table_schema(table_name)
+
+            tag_columns = []
+            field_columns = []
+            for col_schema in table_schema.get_columns():
+                col_name = col_schema.get_column_name()
+                col_category = col_schema.get_category()
+                if col_name.lower() == 'time':
+                    continue
+                if col_category == ColumnCategory.TAG:
+                    tag_columns.append(col_name)
+                elif col_category == ColumnCategory.FIELD:
+                    field_columns.append(col_name)
+
+            if not field_columns:
+                continue
+
+            # Query TAG columns + first FIELD column to discover groups and 
timestamps
+            query_cols = tag_columns + [field_columns[0]]
+
+            time_arrays = []
+            tag_arrays = {tc: [] for tc in tag_columns}
+
+            with self._reader.query_table_batch(
+                table_name, query_cols, batch_size=65536
+            ) as rs:
+                while True:
+                    arrow_table = rs.read_arrow_batch()
+                    if arrow_table is None:
+                        break
+                    batch_rows = arrow_table.num_rows
+                    total_rows += batch_rows
+                    time_arrays.append(arrow_table.column('time').to_numpy())
+                    for tc in tag_columns:
+                        
tag_arrays[tc].append(arrow_table.column(tc).to_numpy())
+
+                    if self.show_progress:
+                        sys.stderr.write(
+                            f"\rReading TsFile metadata: "
+                            f"table {ti + 1}/{len(table_names)} "
+                            f"[{table_name}] "
+                            f"({total_rows:,} rows)"
+                        )
+                        sys.stderr.flush()
+
+            if not time_arrays:
+                continue
+
+            all_times = np.concatenate(time_arrays).astype(np.int64)
+
+            if tag_columns:
+                # Merge tag columns and group by unique tag combinations
+                all_tags = {tc: np.concatenate(tag_arrays[tc]) for tc in 
tag_columns}
+
+                # Build a composite key for grouping
+                if len(tag_columns) == 1:
+                    tag_key = all_tags[tag_columns[0]]
+                    unique_keys = np.unique(tag_key)
+                    for uk in unique_keys:
+                        mask = tag_key == uk
+                        tag_values = (uk,) if not isinstance(uk, tuple) else uk
+                        self._register_tag_group(
+                            table_name, tag_columns, tag_values,
+                            field_columns, all_times[mask]
+                        )
+                else:
+                    # Multiple tag columns: use structured approach
+                    # Convert to list of tuples for grouping
+                    n = len(all_times)
+                    tag_tuples = [
+                        tuple(all_tags[tc][i] for tc in tag_columns)
+                        for i in range(n)
+                    ]
+                    unique_tuples = list(dict.fromkeys(tag_tuples))
+                    for ut in unique_tuples:
+                        mask = np.array([t == ut for t in tag_tuples], 
dtype=bool)
+                        self._register_tag_group(
+                            table_name, tag_columns, ut,
+                            field_columns, all_times[mask]

Review Comment:
   For tables with multiple tag columns, grouping is done by:
   1. Building a Python list of tuples (line 174-177): O(n)
   2. For each unique tuple, scanning the entire list to build a boolean mask 
(line 180): O(n * k) where k = number of unique groups
   
   For a table with 1M rows and 1000 tag groups, this becomes ~1 billion 
comparisons in pure Python.



##########
python/tsfile/tsfile_dataframe.py:
##########
@@ -0,0 +1,859 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""
+TsFileDataFrame - Lazy-loaded unified view over multiple TsFile files.
+
+Provides array-like and DataFrame-like access to time series data
+stored in TsFile format. Supports:
+1. Pre-training: random index access with array-like slicing
+2. Post-training: time-aligned multi-series queries via .loc
+"""
+
+import os
+import sys
+from collections import defaultdict
+from typing import List, Dict, Union, Optional, Tuple
+from datetime import datetime
+
+import numpy as np
+
+
+def _format_timestamp(ts_ms: int) -> str:
+    """Convert millisecond timestamp to human-readable string."""
+    try:
+        return datetime.fromtimestamp(ts_ms / 1000).strftime('%Y-%m-%d 
%H:%M:%S')
+    except (OSError, ValueError):
+        return str(ts_ms)
+
+
+class AlignedTimeseries:
+    """
+    Time-aligned multi-series query result with timestamps.
+
+    Returned by .loc[...] and df[slice/list]. Supports:
+    - result.timestamps -> np.ndarray of ms timestamps
+    - result.values -> np.ndarray of shape (rows, cols)
+    - result.series_names -> list of series name strings
+    - result[i] / result[i, j] -> index into values
+    - print(result) -> truncated table (20 rows)
+    - result.show() -> full table (no truncation)
+    - result.show(50) -> show up to 50 rows
+    """
+
+    def __init__(self, timestamps: np.ndarray, values: np.ndarray, 
series_names: List[str]):
+        self.timestamps = timestamps
+        self.values = values
+        self.series_names = series_names
+
+    @property
+    def shape(self):
+        return self.values.shape
+
+    def __len__(self):
+        return len(self.timestamps)
+
+    def __getitem__(self, key):
+        return self.values[key]
+
+    def _build_display(self):
+        """Pre-compute string representations for display."""
+        n_rows, n_cols = self.values.shape
+        ts_strs = [_format_timestamp(int(t)) for t in self.timestamps]
+        ts_width = max((len(s) for s in ts_strs), default=0)
+        ts_width = max(ts_width, len('timestamp'))
+
+        col_widths = []
+        val_strs = []
+        for col_idx in range(n_cols):
+            col_name = self.series_names[col_idx] if col_idx < 
len(self.series_names) else f'col_{col_idx}'
+            w = len(col_name)
+            col_vals = []
+            for row_idx in range(n_rows):
+                v = self.values[row_idx, col_idx]
+                s = 'NaN' if np.isnan(v) else f'{v:.2f}'
+                col_vals.append(s)
+                w = max(w, len(s))
+            val_strs.append(col_vals)
+            col_widths.append(w)
+
+        return ts_strs, ts_width, col_widths, val_strs
+
+    def _format_rows(self, ts_strs, ts_width, col_widths, val_strs, max_rows):
+        """Format rows with optional truncation."""
+        n_rows = len(ts_strs)
+        n_cols = len(col_widths)
+
+        header_parts = ['timestamp'.rjust(ts_width)]
+        for col_idx in range(n_cols):
+            col_name = self.series_names[col_idx] if col_idx < 
len(self.series_names) else f'col_{col_idx}'
+            header_parts.append(col_name.rjust(col_widths[col_idx]))
+        lines = ['  '.join(header_parts)]
+
+        if max_rows is None or n_rows <= max_rows:
+            show_rows = list(range(n_rows))
+        else:
+            show_rows = list(range(max_rows))
+
+        for row_idx in show_rows:
+            parts = [ts_strs[row_idx].rjust(ts_width)]
+            for col_idx in range(n_cols):
+                
parts.append(val_strs[col_idx][row_idx].rjust(col_widths[col_idx]))
+            lines.append('  '.join(parts))
+
+        return f"AlignedTimeseries({n_rows} rows, {n_cols} series)\n" + 
'\n'.join(lines)
+
+    def __repr__(self):
+        n_rows, n_cols = self.values.shape
+        if n_rows == 0:
+            return f"AlignedTimeseries(0 rows, {n_cols} series)"
+        ts_strs, ts_width, col_widths, val_strs = self._build_display()
+        return self._format_rows(ts_strs, ts_width, col_widths, val_strs, 
max_rows=20)
+
+    def show(self, max_rows: Optional[int] = None):
+        """Print formatted table with configurable row limit.
+
+        Args:
+            max_rows: Maximum rows to display. None for all rows.
+        """
+        n_rows, n_cols = self.values.shape
+        if n_rows == 0:
+            print(f"AlignedTimeseries(0 rows, {n_cols} series)")
+            return
+        ts_strs, ts_width, col_widths, val_strs = self._build_display()
+        print(self._format_rows(ts_strs, ts_width, col_widths, val_strs, 
max_rows))
+
+
+class Timeseries:
+    """
+    Single time series abstraction.
+
+    Supports row-based slicing (converting to time-range queries internally),
+    stats access, and length queries.
+
+    When a series spans multiple files, data is merged transparently.
+    """
+
+    def __init__(self, name: str, readers_and_infos: list, merged_timestamps: 
np.ndarray):
+        """
+        Args:
+            name: Full series path (e.g., "weather.Beijing.humidity").
+            readers_and_infos: List of (reader, series_info) tuples for this 
series.
+            merged_timestamps: Sorted, deduplicated timestamp array across all 
files.
+        """
+        self._name = name
+        self._readers_and_infos = readers_and_infos
+        self._timestamps = merged_timestamps
+
+    @property
+    def name(self) -> str:
+        return self._name
+
+    @property
+    def timestamps(self) -> np.ndarray:
+        return self._timestamps
+
+    @property
+    def stats(self) -> dict:
+        count = len(self._timestamps)
+        if count == 0:
+            return {'start_time': None, 'end_time': None, 'count': 0}
+        return {
+            'start_time': int(self._timestamps[0]),
+            'end_time': int(self._timestamps[-1]),
+            'count': count,
+        }
+
+    def __len__(self) -> int:
+        return len(self._timestamps)
+
+    def __getitem__(self, key):
+        """Row-based access.
+
+        - series[20] -> single float value
+        - series[20:100] -> np.ndarray of values
+        """
+        length = len(self._timestamps)
+
+        if isinstance(key, int):
+            if key < 0:
+                key = length + key
+            if key < 0 or key >= length:
+                raise IndexError(f"Index {key} out of range [0, {length})")
+            ts = int(self._timestamps[key])
+            _, vals = self._query_time_range(ts, ts)
+            return float(vals[0]) if len(vals) > 0 else None
+
+        elif isinstance(key, slice):
+            start, stop, step = key.indices(length)
+            if start >= stop:
+                return np.array([], dtype=np.float64)
+
+            # Get exact timestamps for the requested rows
+            requested_ts = self._timestamps[start:stop]
+            if len(requested_ts) == 0:
+                return np.array([], dtype=np.float64)
+
+            # Query by time range, then filter to exact timestamps
+            start_ts = int(requested_ts[0])
+            end_ts = int(requested_ts[-1])
+            ts_arr, vals = self._query_time_range(start_ts, end_ts)
+
+            # Vectorized alignment: both ts_arr and requested_ts are sorted
+            result = np.full(len(requested_ts), np.nan)
+            if len(ts_arr) > 0:
+                indices = np.searchsorted(ts_arr, requested_ts)
+                valid = (indices < len(ts_arr)) & (ts_arr[np.minimum(indices, 
len(ts_arr) - 1)] == requested_ts)
+                result[valid] = vals[indices[valid]]
+            vals = result
+
+            if step != 1:
+                vals = vals[::step]
+            return vals
+
+        else:
+            raise TypeError(f"Unsupported key type: {type(key)}")
+
+    def _query_time_range(self, start_time: int, end_time: int) -> 
Tuple[np.ndarray, np.ndarray]:
+        """Query all readers for this series in the given time range, merge 
results."""
+        all_ts = []
+        all_vals = []
+        for reader, info in self._readers_and_infos:
+            # Skip reader if its data doesn't overlap
+            if info['max_time'] < start_time or info['min_time'] > end_time:
+                continue
+            ts_arr, val_arr = reader.read_series_by_time_range(
+                self._name, start_time, end_time
+            )
+            if len(ts_arr) > 0:
+                all_ts.append(ts_arr)
+                all_vals.append(val_arr)
+
+        if not all_ts:
+            return np.array([], dtype=np.int64), np.array([], dtype=np.float64)
+
+        if len(all_ts) == 1:
+            return all_ts[0], all_vals[0]
+
+        # Merge from multiple readers, sort by timestamp, deduplicate
+        merged_ts = np.concatenate(all_ts)
+        merged_vals = np.concatenate(all_vals)
+        sort_idx = np.argsort(merged_ts, kind='mergesort')
+        merged_ts = merged_ts[sort_idx]
+        merged_vals = merged_vals[sort_idx]
+
+        # Deduplicate by timestamp (keep first occurrence)
+        _, unique_idx = np.unique(merged_ts, return_index=True)
+        return merged_ts[unique_idx], merged_vals[unique_idx]
+
+    def __repr__(self):
+        stats = self.stats
+        if stats['count'] == 0:
+            return f"Timeseries('{self._name}', count=0)"
+        return (
+            f"Timeseries('{self._name}', count={stats['count']}, "
+            f"start={_format_timestamp(stats['start_time'])}, "
+            f"end={_format_timestamp(stats['end_time'])})"
+        )
+
+
+class _LocIndexer:
+    """
+    Implements .loc[start_time:end_time, series_list] for time-aligned queries.
+
+    Returns AlignedTimeseries with timestamps, values, and series names.
+    """
+
+    def __init__(self, dataframe: 'TsFileDataFrame'):
+        self._df = dataframe
+
+    def _parse_key(self, key):
+        """Parse key into (start_time, end_time, series_names)."""
+        if not isinstance(key, tuple) or len(key) != 2:
+            raise ValueError(
+                "loc requires exactly 2 arguments: "
+                "tsdf.loc[start_time:end_time, series_list]"
+            )
+
+        time_slice, series_spec = key
+
+        # Parse time range
+        if isinstance(time_slice, slice):
+            start_time = time_slice.start
+            end_time = time_slice.stop
+            if start_time is None:
+                start_time = np.iinfo(np.int64).min
+            if end_time is None:
+                end_time = np.iinfo(np.int64).max
+        elif isinstance(time_slice, (int, np.integer)):
+            start_time = end_time = int(time_slice)
+        else:
+            raise TypeError(f"Time index must be slice or int, got 
{type(time_slice)}")
+
+        # Parse series names
+        if isinstance(series_spec, (str, int)):
+            series_spec = [series_spec]
+
+        series_names = []
+        for s in series_spec:
+            if isinstance(s, (int, np.integer)):
+                idx = int(s)
+                if idx < 0 or idx >= len(self._df._series_list):
+                    raise IndexError(f"Series index {idx} out of range")
+                series_names.append(self._df._series_list[idx])
+            elif isinstance(s, str):
+                if s not in self._df._series_map:
+                    raise KeyError(f"Series not found: {s}")
+                series_names.append(s)
+            else:
+                raise TypeError(f"Series specifier must be int or str, got 
{type(s)}")
+
+        return start_time, end_time, series_names
+
+    def _query_aligned(self, start_time: int, end_time: int, series_names: 
List[str]):
+        """Query and align multiple series using batched Arrow reads."""
+        # Group series by (reader_id, table_name, tag_tuple) for batch queries.
+        # Each group can be fetched with a single query_table_batch call.
+        groups = defaultdict(list)  # key -> [(col_idx, field_name, 
series_name, reader, info)]
+
+        for col_idx, name in enumerate(series_names):
+            entries = self._df._series_map[name]
+            for reader, info in entries:
+                if info['max_time'] < start_time or info['min_time'] > 
end_time:
+                    continue
+                key = (id(reader), info['table_name'],
+                       tuple(sorted(info['tag_values'].items())))
+                groups[key].append((col_idx, info['column_name'], name, 
reader, info))
+
+        # Fetch data: one query per group
+        series_data = {}  # series_name -> (ts_arr, val_arr)
+
+        for key, entries in groups.items():
+            reader = entries[0][3]
+            info = entries[0][4]
+            field_columns = list(dict.fromkeys(e[1] for e in entries))  # 
dedupe, keep order
+
+            ts_arr, field_vals = reader.read_multi_series_by_time_range(
+                info['table_name'], field_columns,
+                info['tag_columns'], info['tag_values'],
+                start_time, end_time,
+            )
+
+            for _col_idx, field_name, name, _, _ in entries:
+                if name in series_data:
+                    # Series spans multiple readers: merge
+                    prev_ts, prev_val = series_data[name]
+                    merged_ts = np.concatenate([prev_ts, ts_arr])
+                    merged_val = np.concatenate([prev_val, 
field_vals[field_name]])
+                    sort_idx = np.argsort(merged_ts, kind='mergesort')
+                    merged_ts = merged_ts[sort_idx]
+                    merged_val = merged_val[sort_idx]
+                    _, unique_idx = np.unique(merged_ts, return_index=True)
+                    series_data[name] = (merged_ts[unique_idx], 
merged_val[unique_idx])
+                else:
+                    series_data[name] = (ts_arr, field_vals[field_name])
+
+        # Collect unique timestamps using numpy
+        all_ts_arrays = [data[0] for data in series_data.values() if 
len(data[0]) > 0]
+        if not all_ts_arrays:
+            return (np.array([], dtype=np.int64),
+                    np.array([]).reshape(0, len(series_names)))
+
+        sorted_timestamps = np.unique(np.concatenate(all_ts_arrays))
+
+        # Build result matrix using np.searchsorted for vectorized alignment
+        result = np.full((len(sorted_timestamps), len(series_names)), np.nan)
+
+        for col_idx, name in enumerate(series_names):
+            if name not in series_data:
+                continue
+            ts_arr, val_arr = series_data[name]
+            if len(ts_arr) == 0:
+                continue
+            indices = np.searchsorted(sorted_timestamps, ts_arr)
+            result[indices, col_idx] = val_arr
+
+        return sorted_timestamps, result
+
+    def __getitem__(self, key) -> 'AlignedTimeseries':
+        start_time, end_time, series_names = self._parse_key(key)
+        timestamps, values = self._query_aligned(start_time, end_time, 
series_names)
+        return AlignedTimeseries(timestamps, values, series_names)
+
+
+class TsFileDataFrame:
+    """
+    Lazy-loaded unified view over multiple TsFile files.
+
+    Each dimension is a time series identified by Table + Tags + Field.
+    Supports cross-file merging of same-named series.
+    """
+
+    def __init__(self, paths: Union[str, List[str]], show_progress: bool = 
True):
+        if isinstance(paths, str):
+            paths = [paths]
+
+        # Expand directories: collect all .tsfile files within each directory
+        expanded = []
+        for p in paths:
+            if os.path.isdir(p):
+                tsfiles = sorted(
+                    os.path.join(root, f)
+                    for root, _, files in os.walk(p)
+                    for f in files
+                    if f.endswith('.tsfile')
+                )
+                if not tsfiles:
+                    raise FileNotFoundError(
+                        f"No .tsfile files found in directory: {p}"
+                    )
+                expanded.extend(tsfiles)
+            else:
+                expanded.append(p)
+
+        self._paths = []
+        for p in expanded:
+            if not os.path.exists(p):
+                raise FileNotFoundError(f"TsFile not found: {p}")
+            self._paths.append(os.path.abspath(p))
+
+        self._show_progress = show_progress
+        self._readers = {}
+        self._series_list: List[str] = []
+        self._series_map: Dict[str, list] = {}  # series_path -> [(reader, 
info), ...]
+        self._merged_timestamps: Dict[str, np.ndarray] = {}
+        self._merged_info: Dict[str, dict] = {}
+        self._name_to_index: Dict[str, int] = {}
+
+        self._is_view = False
+        self._root = None
+
+        self._load_metadata()
+
+    @classmethod
+    def _from_subset(cls, parent: 'TsFileDataFrame', series_names: List[str]) 
-> 'TsFileDataFrame':
+        """Create a lightweight view over a subset of series.
+
+        Shares readers, series_map, merged_timestamps, and merged_info
+        with the root. Does NOT own any readers and will not close them.
+        """
+        obj = object.__new__(cls)
+        obj._root = parent._root if parent._is_view else parent
+        obj._is_view = True
+        obj._paths = parent._paths
+        obj._readers = parent._readers
+        obj._series_map = parent._series_map
+        obj._merged_timestamps = parent._merged_timestamps
+        obj._merged_info = parent._merged_info
+        obj._show_progress = parent._show_progress
+        obj._series_list = list(series_names)
+        obj._name_to_index = {name: i for i, name in enumerate(series_names)}
+        return obj
+
+    def _load_metadata(self):
+        """Load metadata from all TsFile files."""
+        from .tsfile_series_reader import TsFileSeriesReader
+
+        if len(self._paths) >= 2:
+            self._load_metadata_parallel(TsFileSeriesReader)
+        else:
+            self._load_metadata_serial(TsFileSeriesReader)
+
+        # Build ordered series list and merged metadata
+        seen = set()
+        for file_path in self._paths:
+            reader = self._readers[file_path]
+            for series_path in reader.series_paths:
+                if series_path not in seen:
+                    seen.add(series_path)
+                    self._series_list.append(series_path)
+                    self._name_to_index[series_path] = len(self._series_list) 
- 1
+                    self._build_merged_info(series_path)
+
+        if not self._series_list:
+            raise ValueError("No valid time series found in the provided 
TsFile files")
+
+    def _load_metadata_serial(self, ReaderClass):
+        """Load metadata from files sequentially."""
+        for file_path in self._paths:
+            reader = ReaderClass(file_path, show_progress=self._show_progress)
+            self._register_reader(file_path, reader)
+
+    def _load_metadata_parallel(self, ReaderClass):
+        """Load metadata from files in parallel using threads."""
+        from concurrent.futures import ThreadPoolExecutor, as_completed
+
+        def _open_file(file_path):
+            return file_path, ReaderClass(file_path, show_progress=False)
+
+        num_workers = min(len(self._paths), os.cpu_count() or 4)
+        total = len(self._paths)
+        with ThreadPoolExecutor(max_workers=num_workers) as executor:
+            futures = {executor.submit(_open_file, p): p for p in self._paths}
+            results = {}
+            done_count = 0
+            for future in as_completed(futures):
+                file_path, reader = future.result()
+                results[file_path] = reader
+                done_count += 1
+                if self._show_progress:
+                    sys.stderr.write(
+                        f"\rLoading TsFile shards: "
+                        f"{done_count}/{total}"
+                    )
+                    sys.stderr.flush()
+
+        if self._show_progress and total > 0:
+            total_series = sum(len(r.series_paths) for r in results.values())
+            sys.stderr.write(
+                f"\rLoading TsFile shards: "
+                f"{total}/{total} "
+                f"({total_series} series) "
+                f"... done\n"
+            )
+            sys.stderr.flush()
+
+        # Register in original path order to keep series ordering deterministic
+        for file_path in self._paths:
+            self._register_reader(file_path, results[file_path])
+
+    def _register_reader(self, file_path: str, reader):
+        """Register a reader and index its series."""
+        self._readers[file_path] = reader
+        for series_path in reader.series_paths:
+            if series_path not in self._series_map:
+                self._series_map[series_path] = []
+            self._series_map[series_path].append(
+                (reader, reader.series_info[series_path])
+            )
+
+    def _build_merged_info(self, series_path: str):
+        """Merge timestamps and metadata from all readers for a given 
series."""
+        entries = self._series_map[series_path]
+        # Use first entry for structural metadata (table/tag/field are 
identical across files)
+        _, first_info = entries[0]
+
+        if len(entries) == 1:
+            reader, info = entries[0]
+            self._merged_timestamps[series_path] = 
reader._timestamps_cache[series_path]
+            self._merged_info[series_path] = {
+                'table_name': info['table_name'],
+                'tag_columns': info['tag_columns'],
+                'tag_values': info['tag_values'],
+                'field': info['column_name'],
+                'min_time': info['min_time'],
+                'max_time': info['max_time'],
+                'count': info['length'],
+            }
+        else:
+            all_ts = []
+            for reader, info in entries:
+                all_ts.append(reader._timestamps_cache[series_path])
+            merged = np.unique(np.concatenate(all_ts))
+            merged.sort()
+            self._merged_timestamps[series_path] = merged
+            self._merged_info[series_path] = {
+                'table_name': first_info['table_name'],
+                'tag_columns': first_info['tag_columns'],
+                'tag_values': first_info['tag_values'],
+                'field': first_info['column_name'],
+                'min_time': int(merged[0]),
+                'max_time': int(merged[-1]),
+                'count': len(merged),
+            }
+
+    def __len__(self) -> int:
+        return len(self._series_list)
+
+    @property
+    def timeseries(self) -> List[dict]:
+        """Return metadata for each series as a list of dicts."""
+        result = []
+        for name in self._series_list:
+            info = self._merged_info[name]
+            entry = {
+                'table': info['table_name'],
+                'field': info['field'],
+                'start_time': _format_timestamp(info['min_time']),
+                'end_time': _format_timestamp(info['max_time']),
+                'count': info['count'],
+            }
+            for col in info['tag_columns']:
+                entry[col] = info['tag_values'].get(col, '')
+            result.append(entry)
+        return result
+
+    def list_timeseries(self, path_prefix: str = "") -> List[str]:
+        """List series paths, optionally filtered by prefix.
+
+        Args:
+            path_prefix: If given, only series whose path starts with this 
prefix.
+
+        Returns:
+            List of series path strings.
+        """
+        if not path_prefix:
+            return list(self._series_list)
+        prefix = path_prefix if path_prefix.endswith('.') else path_prefix + 
'.'
+        return [name for name in self._series_list
+                if name.startswith(prefix) or name == path_prefix]
+
+    def metadata(self):
+        """Return metadata for all time series as a pandas DataFrame.
+
+        Each row represents one time series. Columns are:
+          - series_path : full dotted path (table[.tag_values...].field)
+          - table       : TsFile table name
+          - <tag_col>   : one column per tag column, filled with the tag value
+                          (empty string if this series has no such tag)
+          - field       : measurement / field column name
+          - start_time  : earliest timestamp (ms, int)
+          - end_time    : latest  timestamp (ms, int)
+          - count       : number of data points
+
+        Returns:
+            pandas.DataFrame
+        """
+        import pandas as pd
+
+        tag_cols = self._collect_tag_columns()
+        fixed_cols = ['series_path', 'table'] + tag_cols + ['field', 
'start_time', 'end_time', 'count']
+
+        rows = []
+        for name in self._series_list:
+            info = self._merged_info[name]
+            row = {
+                'series_path': name,
+                'table': info['table_name'],
+                'field': info['field'],
+                'start_time': info['min_time'],
+                'end_time': info['max_time'],
+                'count': info['count'],
+            }
+            for tc in tag_cols:
+                row[tc] = info['tag_values'].get(tc, '')
+            rows.append(row)
+
+        return pd.DataFrame(rows, columns=fixed_cols)
+
+    def _get_timeseries(self, name: str) -> 'Timeseries':
+        """Internal helper to create a Timeseries object."""
+        return Timeseries(name, self._series_map[name], 
self._merged_timestamps[name])
+
+    def __getitem__(self, key):
+        """Access time series by index, name, slice, or list.
+
+        - tsdf[0] -> Timeseries
+        - tsdf['weather.Beijing.humidity'] -> Timeseries
+        - tsdf[1:3] -> TsFileDataFrame (subset view)
+        - tsdf[[0, 1, 2]] -> TsFileDataFrame (subset view)
+        - tsdf[tsdf['start_time'] > xxx] -> TsFileDataFrame (subset view)
+        """
+        # pandas boolean Series from metadata() filtering
+        try:
+            import pandas as pd
+            if isinstance(key, pd.Series) and key.dtype == bool:
+                selected_names = [self._series_list[i] for i in key.index[key]]
+                return TsFileDataFrame._from_subset(self, selected_names)
+        except ImportError:
+            pass
+
+        if isinstance(key, (int, np.integer)):
+            key = int(key)
+            if key < 0:
+                key = len(self._series_list) + key
+            if key < 0 or key >= len(self._series_list):
+                raise IndexError(f"Index {key} out of range [0, 
{len(self._series_list)})")
+            name = self._series_list[key]
+            return self._get_timeseries(name)
+
+        elif isinstance(key, str):
+            # If the key matches a series name in this dataframe's subset, 
return Timeseries.
+            # Otherwise treat it as a metadata column name and return a pandas 
Series,
+            # mirroring the behaviour of metadata()[key] so that:
+            #   tsdf['ps_id'] == '10'  ->  boolean pd.Series
+            #   tsdf[tsdf['ps_id'] == '10']  ->  TsFileDataFrame (subset)
+            if key in self._name_to_index:
+                return self._get_timeseries(key)
+            # Check if it is a valid metadata column
+            valid_cols = {'table', 'field', 'start_time', 'end_time', 'count'}
+            valid_cols.update(self._collect_tag_columns())
+            if key in valid_cols:
+                import pandas as pd
+                if key == 'table':
+                    values = [self._merged_info[n]['table_name'] for n in 
self._series_list]
+                elif key == 'field':
+                    values = [self._merged_info[n]['field'] for n in 
self._series_list]
+                elif key == 'start_time':
+                    values = [self._merged_info[n]['min_time'] for n in 
self._series_list]
+                elif key == 'end_time':
+                    values = [self._merged_info[n]['max_time'] for n in 
self._series_list]
+                elif key == 'count':
+                    values = [self._merged_info[n]['count'] for n in 
self._series_list]
+                else:
+                    values = [self._merged_info[n]['tag_values'].get(key, '')
+                              for n in self._series_list]
+                return pd.Series(values, name=key)
+            raise KeyError(f"Series not found: '{key}'. "
+                           f"Use df.metadata() to see available metadata 
columns.")
+
+        elif isinstance(key, slice):
+            indices = list(range(*key.indices(len(self._series_list))))
+            selected_names = [self._series_list[i] for i in indices]
+            return TsFileDataFrame._from_subset(self, selected_names)
+
+        elif isinstance(key, list):
+            selected_names = []
+            for k in key:
+                if not isinstance(k, (int, np.integer)):
+                    raise TypeError(f"List index must contain integers, got 
{type(k)}")
+                idx = int(k)
+                if idx < 0:
+                    idx = len(self._series_list) + idx
+                if idx < 0 or idx >= len(self._series_list):
+                    raise IndexError(f"Index {k} out of range [0, 
{len(self._series_list)})")
+                selected_names.append(self._series_list[idx])
+            return TsFileDataFrame._from_subset(self, selected_names)
+
+        else:
+            raise TypeError(f"Unsupported key type: {type(key)}")
+
+    @property
+    def loc(self):
+        """Attribute-style access to the loc indexer."""
+        return _LocIndexer(self)
+
+    def _collect_tag_columns(self) -> List[str]:
+        """Return ordered list of all unique tag column names across all 
series."""
+        seen = {}
+        for name in self._series_list:
+            for col in self._merged_info[name]['tag_columns']:
+                if col not in seen:
+                    seen[col] = True
+        return list(seen.keys())
+
+    def _format_table(self, indices=None, max_rows=20) -> str:
+        """Format series metadata as an aligned text table."""
+        if indices is None:
+            indices = range(len(self._series_list))
+
+        indices = list(indices)
+        total = len(indices)
+
+        if total > max_rows:
+            show_indices = list(indices[:max_rows // 2]) + 
list(indices[-max_rows // 2:])
+            truncated = True
+        else:
+            show_indices = indices
+            truncated = False
+
+        # Determine which tag columns exist across shown rows
+        tag_cols = self._collect_tag_columns()
+
+        rows = []
+        for idx in show_indices:
+            name = self._series_list[idx]
+            info = self._merged_info[name]
+            row = {
+                'index': idx,
+                'table': info['table_name'],
+                'field': info['field'],
+                'start_time': _format_timestamp(info['min_time']),
+                'end_time': _format_timestamp(info['max_time']),
+                'count': info['count'],
+            }
+            for tc in tag_cols:
+                row[tc] = info['tag_values'].get(tc, '')
+            rows.append(row)
+
+        if not rows:
+            return "Empty TsFileDataFrame"
+
+        # Build ordered headers: index, table, tag_cols..., field, start_time, 
end_time, count
+        fixed_headers = ['', 'table'] + tag_cols + ['field', 'start_time', 
'end_time', 'count']
+        col_widths = {h: len(h) for h in fixed_headers}
+        col_widths[''] = max(len(str(r['index'])) for r in rows)
+
+        for r in rows:
+            col_widths[''] = max(col_widths[''], len(str(r['index'])))
+            col_widths['table'] = max(col_widths['table'], len(r['table']))
+            col_widths['field'] = max(col_widths['field'], len(r['field']))
+            col_widths['start_time'] = max(col_widths['start_time'], 
len(r['start_time']))
+            col_widths['end_time'] = max(col_widths['end_time'], 
len(r['end_time']))
+            col_widths['count'] = max(col_widths['count'], 
len(str(r['count'])))
+            for tc in tag_cols:
+                col_widths[tc] = max(col_widths[tc], len(str(r[tc])))
+
+        header_line = "  ".join(h.rjust(col_widths[h]) for h in fixed_headers)
+        lines = [header_line]
+
+        half = len(rows) // 2 if truncated else len(rows)
+        for i, r in enumerate(rows):
+            if truncated and i == half:
+                lines.append("...")
+            parts = [
+                str(r['index']).rjust(col_widths['']),
+                r['table'].rjust(col_widths['table']),
+            ]
+            for tc in tag_cols:
+                parts.append(str(r[tc]).rjust(col_widths[tc]))
+            parts += [
+                r['field'].rjust(col_widths['field']),
+                r['start_time'].rjust(col_widths['start_time']),
+                r['end_time'].rjust(col_widths['end_time']),
+                str(r['count']).rjust(col_widths['count']),
+            ]
+            lines.append("  ".join(parts))
+
+        return "\n".join(lines)
+
+    def __repr__(self):
+        total = len(self._series_list)
+        if self._is_view:
+            root_total = len(self._root._series_list)
+            header = f"TsFileDataFrame({total} time series, subset of 
{root_total})\n"
+        else:
+            header = f"TsFileDataFrame({total} time series, {len(self._paths)} 
files)\n"
+        return header + self._format_table()
+
+    def __str__(self):
+        return self.__repr__()
+
+    def close(self):
+        """Close all underlying readers.
+
+        No-op for subset views (they don't own readers).
+        """
+        if self._is_view:
+            return
+        for reader in self._readers.values():
+            reader.close()
+        self._readers.clear()

Review Comment:
   After `close()`, `_readers` is cleared but `_series_map` still holds 
references to closed readers. Any subsequent data access (e.g., `tsdf[0][0]`) 
will attempt to read from a closed reader, producing an unclear error.
   
   **Recommendation**: Either invalidate `_series_map` too, or set a `_closed` 
flag and check it in data-access paths.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to