Copilot commented on code in PR #765:
URL: https://github.com/apache/tsfile/pull/765#discussion_r3028131286


##########
python/tsfile/tsfile_series_reader.py:
##########
@@ -0,0 +1,440 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""
+High-performance TsFile Series Reader
+
+Optimized for time series data reading using the Arrow columnar API.
+Wraps TsFileReaderPy (Cython) and provides series-level metadata
+discovery, timestamp caching, and batch reads with TAG filtering.
+"""
+
+import os
+import sys
+from typing import List, Dict, Optional, Tuple
+
+import numpy as np
+import pyarrow.compute as pc
+
+from .constants import ColumnCategory
+from .tsfile_reader import TsFileReaderPy
+
+
+class TsFileSeriesReader:
+    """
+    TsFile Series Reader
+
+    Wrapper around the Cython TsFileReaderPy for reading TsFile data
+    at the series level. Supports TAG columns: a time series is uniquely
+    identified by Table + Tag values + Field column, producing series
+    paths like "weather.beijing.humidity".
+    """
+
+    def __init__(self, file_path: str, show_progress: bool = True):
+        if not os.path.exists(file_path):
+            raise FileNotFoundError(f"TsFile not found: {file_path}")
+
+        self.file_path = file_path
+        self.show_progress = show_progress
+
+        try:
+            self._reader = TsFileReaderPy(file_path)
+        except Exception as e:
+            raise ValueError(f"Failed to open TsFile: {e}")
+
+        self.series_paths: List[str] = []
+        self.series_info: Dict[str, dict] = {}
+        self._timestamps_cache: Dict[str, np.ndarray] = {}
+        self._series_data_cache: Dict[str, np.ndarray] = {}
+
+        self._cache_metadata()
+
+    def __del__(self):
+        self.close()
+
+    def close(self):
+        """Close the underlying Cython reader."""
+        if hasattr(self, '_reader'):
+            try:
+                self._reader.close()
+            except Exception:
+                pass
+
+    def _cache_metadata(self):
+        """Cache metadata from the TsFile."""
+        try:
+            self._cache_metadata_table_model()
+        except Exception as e:
+            raise ValueError(
+                f"Failed to read TsFile metadata. "
+                f"Please ensure the TsFile is valid and readable. Error: {e}"
+            )
+
+    def _cache_metadata_table_model(self):
+        """
+        Cache metadata using table model query via Arrow batch API.
+
+        Unified logic for tables with or without TAG columns.
+        """
+        table_schemas = self._reader.get_all_table_schemas()
+        if not table_schemas:
+            raise ValueError("No tables found in TsFile")
+
+        self.series_paths = []
+        table_names = list(table_schemas.keys())
+
+        # Progress tracking
+        total_rows = 0
+
+        for ti, table_name in enumerate(table_names):
+            table_schema = self._reader.get_table_schema(table_name)
+
+            tag_columns = []
+            field_columns = []
+            for col_schema in table_schema.get_columns():
+                col_name = col_schema.get_column_name()
+                col_category = col_schema.get_category()
+                if col_name.lower() == 'time':
+                    continue
+                if col_category == ColumnCategory.TAG:
+                    tag_columns.append(col_name)
+                elif col_category == ColumnCategory.FIELD:
+                    field_columns.append(col_name)
+
+            if not field_columns:
+                continue
+
+            # Query TAG columns + first FIELD column to discover groups and 
timestamps
+            query_cols = tag_columns + [field_columns[0]]
+
+            time_arrays = []
+            tag_arrays = {tc: [] for tc in tag_columns}
+
+            with self._reader.query_table_batch(
+                table_name, query_cols, batch_size=65536
+            ) as rs:
+                while True:
+                    arrow_table = rs.read_arrow_batch()
+                    if arrow_table is None:
+                        break
+                    batch_rows = arrow_table.num_rows
+                    total_rows += batch_rows
+                    time_arrays.append(arrow_table.column('time').to_numpy())
+                    for tc in tag_columns:
+                        
tag_arrays[tc].append(arrow_table.column(tc).to_numpy())
+
+                    if self.show_progress:
+                        sys.stderr.write(
+                            f"\rReading TsFile metadata: "
+                            f"table {ti + 1}/{len(table_names)} "
+                            f"[{table_name}] "
+                            f"({total_rows:,} rows)"
+                        )
+                        sys.stderr.flush()
+
+            if not time_arrays:
+                continue
+
+            all_times = np.concatenate(time_arrays).astype(np.int64)
+
+            if tag_columns:
+                # Merge tag columns and group by unique tag combinations
+                all_tags = {tc: np.concatenate(tag_arrays[tc]) for tc in 
tag_columns}
+
+                # Build a composite key for grouping
+                if len(tag_columns) == 1:
+                    tag_key = all_tags[tag_columns[0]]
+                    unique_keys = np.unique(tag_key)
+                    for uk in unique_keys:
+                        mask = tag_key == uk
+                        tag_values = (uk,) if not isinstance(uk, tuple) else uk
+                        self._register_tag_group(
+                            table_name, tag_columns, tag_values,
+                            field_columns, all_times[mask]
+                        )
+                else:
+                    # Multiple tag columns: use structured approach
+                    # Convert to list of tuples for grouping
+                    n = len(all_times)
+                    tag_tuples = [
+                        tuple(all_tags[tc][i] for tc in tag_columns)
+                        for i in range(n)
+                    ]
+                    unique_tuples = list(dict.fromkeys(tag_tuples))
+                    for ut in unique_tuples:
+                        mask = np.array([t == ut for t in tag_tuples], 
dtype=bool)
+                        self._register_tag_group(
+                            table_name, tag_columns, ut,
+                            field_columns, all_times[mask]

Review Comment:
   The multi-tag grouping path builds `tag_tuples` in Python and then, for each 
unique tuple, constructs a full boolean mask with a Python loop (`[t == ut for 
t in tag_tuples]`). This is O(n * unique_tags) and will become a bottleneck for 
large tables. Consider using a vectorized approach (e.g., structured NumPy 
array + `np.unique(..., return_inverse=True)` or pandas/groupby) to compute 
groups and indices in (near) linear time.
   ```suggestion
                       # Multiple tag columns: use structured NumPy array for 
grouping
                       n = len(all_times)
                       # Build a structured array with one field per tag column
                       dtype = [(tc, all_tags[tc].dtype) for tc in tag_columns]
                       structured_tags = np.empty(n, dtype=dtype)
                       for tc in tag_columns:
                           structured_tags[tc] = all_tags[tc]
   
                       # Find unique tag combinations and an inverse index for 
grouping
                       unique_vals, inverse = np.unique(
                           structured_tags, return_inverse=True
                       )
   
                       # Group rows by unique tag combination using the inverse 
index
                       for group_id, ut in enumerate(unique_vals):
                           mask = inverse == group_id
                           tag_values = tuple(ut[tc] for tc in tag_columns)
                           self._register_tag_group(
                               table_name,
                               tag_columns,
                               tag_values,
                               field_columns,
                               all_times[mask],
   ```



##########
python/tsfile/tsfile_dataframe.py:
##########
@@ -0,0 +1,859 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""
+TsFileDataFrame - Lazy-loaded unified view over multiple TsFile files.
+
+Provides array-like and DataFrame-like access to time series data
+stored in TsFile format. Supports:
+1. Pre-training: random index access with array-like slicing
+2. Post-training: time-aligned multi-series queries via .loc
+"""
+
+import os
+import sys
+from collections import defaultdict
+from typing import List, Dict, Union, Optional, Tuple
+from datetime import datetime
+
+import numpy as np
+
+
+def _format_timestamp(ts_ms: int) -> str:
+    """Convert millisecond timestamp to human-readable string."""
+    try:
+        return datetime.fromtimestamp(ts_ms / 1000).strftime('%Y-%m-%d 
%H:%M:%S')
+    except (OSError, ValueError):
+        return str(ts_ms)
+
+
+class AlignedTimeseries:
+    """
+    Time-aligned multi-series query result with timestamps.
+
+    Returned by .loc[...] and df[slice/list]. Supports:
+    - result.timestamps -> np.ndarray of ms timestamps
+    - result.values -> np.ndarray of shape (rows, cols)
+    - result.series_names -> list of series name strings
+    - result[i] / result[i, j] -> index into values
+    - print(result) -> truncated table (20 rows)
+    - result.show() -> full table (no truncation)
+    - result.show(50) -> show up to 50 rows
+    """
+
+    def __init__(self, timestamps: np.ndarray, values: np.ndarray, 
series_names: List[str]):
+        self.timestamps = timestamps
+        self.values = values
+        self.series_names = series_names
+
+    @property
+    def shape(self):
+        return self.values.shape
+
+    def __len__(self):
+        return len(self.timestamps)
+
+    def __getitem__(self, key):
+        return self.values[key]
+
+    def _build_display(self):
+        """Pre-compute string representations for display."""
+        n_rows, n_cols = self.values.shape
+        ts_strs = [_format_timestamp(int(t)) for t in self.timestamps]
+        ts_width = max((len(s) for s in ts_strs), default=0)
+        ts_width = max(ts_width, len('timestamp'))
+
+        col_widths = []
+        val_strs = []
+        for col_idx in range(n_cols):
+            col_name = self.series_names[col_idx] if col_idx < 
len(self.series_names) else f'col_{col_idx}'
+            w = len(col_name)
+            col_vals = []
+            for row_idx in range(n_rows):
+                v = self.values[row_idx, col_idx]
+                s = 'NaN' if np.isnan(v) else f'{v:.2f}'
+                col_vals.append(s)
+                w = max(w, len(s))
+            val_strs.append(col_vals)
+            col_widths.append(w)
+
+        return ts_strs, ts_width, col_widths, val_strs
+
+    def _format_rows(self, ts_strs, ts_width, col_widths, val_strs, max_rows):
+        """Format rows with optional truncation."""
+        n_rows = len(ts_strs)
+        n_cols = len(col_widths)
+
+        header_parts = ['timestamp'.rjust(ts_width)]
+        for col_idx in range(n_cols):
+            col_name = self.series_names[col_idx] if col_idx < 
len(self.series_names) else f'col_{col_idx}'
+            header_parts.append(col_name.rjust(col_widths[col_idx]))
+        lines = ['  '.join(header_parts)]
+
+        if max_rows is None or n_rows <= max_rows:
+            show_rows = list(range(n_rows))
+        else:
+            show_rows = list(range(max_rows))
+
+        for row_idx in show_rows:
+            parts = [ts_strs[row_idx].rjust(ts_width)]
+            for col_idx in range(n_cols):
+                
parts.append(val_strs[col_idx][row_idx].rjust(col_widths[col_idx]))
+            lines.append('  '.join(parts))
+
+        return f"AlignedTimeseries({n_rows} rows, {n_cols} series)\n" + 
'\n'.join(lines)
+
+    def __repr__(self):
+        n_rows, n_cols = self.values.shape
+        if n_rows == 0:
+            return f"AlignedTimeseries(0 rows, {n_cols} series)"
+        ts_strs, ts_width, col_widths, val_strs = self._build_display()
+        return self._format_rows(ts_strs, ts_width, col_widths, val_strs, 
max_rows=20)
+
+    def show(self, max_rows: Optional[int] = None):
+        """Print formatted table with configurable row limit.
+
+        Args:
+            max_rows: Maximum rows to display. None for all rows.
+        """
+        n_rows, n_cols = self.values.shape
+        if n_rows == 0:
+            print(f"AlignedTimeseries(0 rows, {n_cols} series)")
+            return
+        ts_strs, ts_width, col_widths, val_strs = self._build_display()
+        print(self._format_rows(ts_strs, ts_width, col_widths, val_strs, 
max_rows))
+
+
+class Timeseries:
+    """
+    Single time series abstraction.
+
+    Supports row-based slicing (converting to time-range queries internally),
+    stats access, and length queries.
+
+    When a series spans multiple files, data is merged transparently.
+    """
+
+    def __init__(self, name: str, readers_and_infos: list, merged_timestamps: 
np.ndarray):
+        """
+        Args:
+            name: Full series path (e.g., "weather.Beijing.humidity").
+            readers_and_infos: List of (reader, series_info) tuples for this 
series.
+            merged_timestamps: Sorted, deduplicated timestamp array across all 
files.
+        """
+        self._name = name
+        self._readers_and_infos = readers_and_infos
+        self._timestamps = merged_timestamps
+
+    @property
+    def name(self) -> str:
+        return self._name
+
+    @property
+    def timestamps(self) -> np.ndarray:
+        return self._timestamps
+
+    @property
+    def stats(self) -> dict:
+        count = len(self._timestamps)
+        if count == 0:
+            return {'start_time': None, 'end_time': None, 'count': 0}
+        return {
+            'start_time': int(self._timestamps[0]),
+            'end_time': int(self._timestamps[-1]),
+            'count': count,
+        }
+
+    def __len__(self) -> int:
+        return len(self._timestamps)
+
+    def __getitem__(self, key):
+        """Row-based access.
+
+        - series[20] -> single float value
+        - series[20:100] -> np.ndarray of values
+        """
+        length = len(self._timestamps)
+
+        if isinstance(key, int):
+            if key < 0:
+                key = length + key
+            if key < 0 or key >= length:
+                raise IndexError(f"Index {key} out of range [0, {length})")
+            ts = int(self._timestamps[key])
+            _, vals = self._query_time_range(ts, ts)
+            return float(vals[0]) if len(vals) > 0 else None
+
+        elif isinstance(key, slice):
+            start, stop, step = key.indices(length)
+            if start >= stop:
+                return np.array([], dtype=np.float64)
+
+            # Get exact timestamps for the requested rows
+            requested_ts = self._timestamps[start:stop]
+            if len(requested_ts) == 0:
+                return np.array([], dtype=np.float64)
+
+            # Query by time range, then filter to exact timestamps
+            start_ts = int(requested_ts[0])
+            end_ts = int(requested_ts[-1])
+            ts_arr, vals = self._query_time_range(start_ts, end_ts)
+
+            # Vectorized alignment: both ts_arr and requested_ts are sorted
+            result = np.full(len(requested_ts), np.nan)
+            if len(ts_arr) > 0:
+                indices = np.searchsorted(ts_arr, requested_ts)
+                valid = (indices < len(ts_arr)) & (ts_arr[np.minimum(indices, 
len(ts_arr) - 1)] == requested_ts)
+                result[valid] = vals[indices[valid]]
+            vals = result
+
+            if step != 1:
+                vals = vals[::step]
+            return vals
+
+        else:
+            raise TypeError(f"Unsupported key type: {type(key)}")
+
+    def _query_time_range(self, start_time: int, end_time: int) -> 
Tuple[np.ndarray, np.ndarray]:
+        """Query all readers for this series in the given time range, merge 
results."""
+        all_ts = []
+        all_vals = []
+        for reader, info in self._readers_and_infos:
+            # Skip reader if its data doesn't overlap
+            if info['max_time'] < start_time or info['min_time'] > end_time:
+                continue
+            ts_arr, val_arr = reader.read_series_by_time_range(
+                self._name, start_time, end_time
+            )
+            if len(ts_arr) > 0:
+                all_ts.append(ts_arr)
+                all_vals.append(val_arr)
+
+        if not all_ts:
+            return np.array([], dtype=np.int64), np.array([], dtype=np.float64)
+
+        if len(all_ts) == 1:
+            return all_ts[0], all_vals[0]
+
+        # Merge from multiple readers, sort by timestamp, deduplicate
+        merged_ts = np.concatenate(all_ts)
+        merged_vals = np.concatenate(all_vals)
+        sort_idx = np.argsort(merged_ts, kind='mergesort')
+        merged_ts = merged_ts[sort_idx]
+        merged_vals = merged_vals[sort_idx]
+
+        # Deduplicate by timestamp (keep first occurrence)
+        _, unique_idx = np.unique(merged_ts, return_index=True)
+        return merged_ts[unique_idx], merged_vals[unique_idx]
+
+    def __repr__(self):
+        stats = self.stats
+        if stats['count'] == 0:
+            return f"Timeseries('{self._name}', count=0)"
+        return (
+            f"Timeseries('{self._name}', count={stats['count']}, "
+            f"start={_format_timestamp(stats['start_time'])}, "
+            f"end={_format_timestamp(stats['end_time'])})"
+        )
+
+
+class _LocIndexer:
+    """
+    Implements .loc[start_time:end_time, series_list] for time-aligned queries.
+
+    Returns AlignedTimeseries with timestamps, values, and series names.
+    """
+
+    def __init__(self, dataframe: 'TsFileDataFrame'):
+        self._df = dataframe
+
+    def _parse_key(self, key):
+        """Parse key into (start_time, end_time, series_names)."""
+        if not isinstance(key, tuple) or len(key) != 2:
+            raise ValueError(
+                "loc requires exactly 2 arguments: "
+                "tsdf.loc[start_time:end_time, series_list]"
+            )
+
+        time_slice, series_spec = key
+
+        # Parse time range
+        if isinstance(time_slice, slice):
+            start_time = time_slice.start
+            end_time = time_slice.stop
+            if start_time is None:
+                start_time = np.iinfo(np.int64).min
+            if end_time is None:
+                end_time = np.iinfo(np.int64).max
+        elif isinstance(time_slice, (int, np.integer)):
+            start_time = end_time = int(time_slice)
+        else:
+            raise TypeError(f"Time index must be slice or int, got 
{type(time_slice)}")
+
+        # Parse series names
+        if isinstance(series_spec, (str, int)):
+            series_spec = [series_spec]
+
+        series_names = []
+        for s in series_spec:
+            if isinstance(s, (int, np.integer)):
+                idx = int(s)
+                if idx < 0 or idx >= len(self._df._series_list):
+                    raise IndexError(f"Series index {idx} out of range")
+                series_names.append(self._df._series_list[idx])
+            elif isinstance(s, str):
+                if s not in self._df._series_map:
+                    raise KeyError(f"Series not found: {s}")
+                series_names.append(s)
+            else:
+                raise TypeError(f"Series specifier must be int or str, got 
{type(s)}")
+
+        return start_time, end_time, series_names
+
+    def _query_aligned(self, start_time: int, end_time: int, series_names: 
List[str]):
+        """Query and align multiple series using batched Arrow reads."""
+        # Group series by (reader_id, table_name, tag_tuple) for batch queries.
+        # Each group can be fetched with a single query_table_batch call.
+        groups = defaultdict(list)  # key -> [(col_idx, field_name, 
series_name, reader, info)]
+
+        for col_idx, name in enumerate(series_names):
+            entries = self._df._series_map[name]
+            for reader, info in entries:
+                if info['max_time'] < start_time or info['min_time'] > 
end_time:
+                    continue
+                key = (id(reader), info['table_name'],
+                       tuple(sorted(info['tag_values'].items())))
+                groups[key].append((col_idx, info['column_name'], name, 
reader, info))
+
+        # Fetch data: one query per group
+        series_data = {}  # series_name -> (ts_arr, val_arr)
+
+        for key, entries in groups.items():
+            reader = entries[0][3]
+            info = entries[0][4]
+            field_columns = list(dict.fromkeys(e[1] for e in entries))  # 
dedupe, keep order
+
+            ts_arr, field_vals = reader.read_multi_series_by_time_range(
+                info['table_name'], field_columns,
+                info['tag_columns'], info['tag_values'],
+                start_time, end_time,
+            )
+
+            for _col_idx, field_name, name, _, _ in entries:
+                if name in series_data:
+                    # Series spans multiple readers: merge
+                    prev_ts, prev_val = series_data[name]
+                    merged_ts = np.concatenate([prev_ts, ts_arr])
+                    merged_val = np.concatenate([prev_val, 
field_vals[field_name]])
+                    sort_idx = np.argsort(merged_ts, kind='mergesort')
+                    merged_ts = merged_ts[sort_idx]
+                    merged_val = merged_val[sort_idx]
+                    _, unique_idx = np.unique(merged_ts, return_index=True)
+                    series_data[name] = (merged_ts[unique_idx], 
merged_val[unique_idx])
+                else:
+                    series_data[name] = (ts_arr, field_vals[field_name])
+
+        # Collect unique timestamps using numpy
+        all_ts_arrays = [data[0] for data in series_data.values() if 
len(data[0]) > 0]
+        if not all_ts_arrays:
+            return (np.array([], dtype=np.int64),
+                    np.array([]).reshape(0, len(series_names)))
+
+        sorted_timestamps = np.unique(np.concatenate(all_ts_arrays))
+
+        # Build result matrix using np.searchsorted for vectorized alignment
+        result = np.full((len(sorted_timestamps), len(series_names)), np.nan)
+
+        for col_idx, name in enumerate(series_names):
+            if name not in series_data:
+                continue
+            ts_arr, val_arr = series_data[name]
+            if len(ts_arr) == 0:
+                continue
+            indices = np.searchsorted(sorted_timestamps, ts_arr)
+            result[indices, col_idx] = val_arr
+
+        return sorted_timestamps, result
+
+    def __getitem__(self, key) -> 'AlignedTimeseries':
+        start_time, end_time, series_names = self._parse_key(key)
+        timestamps, values = self._query_aligned(start_time, end_time, 
series_names)
+        return AlignedTimeseries(timestamps, values, series_names)
+
+
+class TsFileDataFrame:
+    """
+    Lazy-loaded unified view over multiple TsFile files.
+
+    Each dimension is a time series identified by Table + Tags + Field.
+    Supports cross-file merging of same-named series.
+    """
+

Review Comment:
   This PR adds a sizable new public API surface (`TsFileDataFrame`, `.loc`, 
cross-file merge semantics, and `TsFileSeriesReader`-based reading), but there 
are no accompanying unit tests. The repo already has pytest-based coverage for 
readers/writers and Arrow batch queries; please add tests that cover: series 
discovery across multiple files, `Timeseries` row slicing, `.loc` alignment for 
multiple series, and behavior with missing/null values.



##########
python/tsfile/tsfile_series_reader.py:
##########
@@ -0,0 +1,440 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""
+High-performance TsFile Series Reader
+
+Optimized for time series data reading using the Arrow columnar API.
+Wraps TsFileReaderPy (Cython) and provides series-level metadata
+discovery, timestamp caching, and batch reads with TAG filtering.
+"""
+
+import os
+import sys
+from typing import List, Dict, Optional, Tuple
+
+import numpy as np
+import pyarrow.compute as pc
+
+from .constants import ColumnCategory
+from .tsfile_reader import TsFileReaderPy
+
+
+class TsFileSeriesReader:
+    """
+    TsFile Series Reader
+
+    Wrapper around the Cython TsFileReaderPy for reading TsFile data
+    at the series level. Supports TAG columns: a time series is uniquely
+    identified by Table + Tag values + Field column, producing series
+    paths like "weather.beijing.humidity".
+    """
+
+    def __init__(self, file_path: str, show_progress: bool = True):
+        if not os.path.exists(file_path):
+            raise FileNotFoundError(f"TsFile not found: {file_path}")
+
+        self.file_path = file_path
+        self.show_progress = show_progress
+
+        try:
+            self._reader = TsFileReaderPy(file_path)
+        except Exception as e:
+            raise ValueError(f"Failed to open TsFile: {e}")
+
+        self.series_paths: List[str] = []
+        self.series_info: Dict[str, dict] = {}
+        self._timestamps_cache: Dict[str, np.ndarray] = {}
+        self._series_data_cache: Dict[str, np.ndarray] = {}
+
+        self._cache_metadata()
+
+    def __del__(self):
+        self.close()
+
+    def close(self):
+        """Close the underlying Cython reader."""
+        if hasattr(self, '_reader'):
+            try:
+                self._reader.close()
+            except Exception:
+                pass
+
+    def _cache_metadata(self):
+        """Cache metadata from the TsFile."""
+        try:
+            self._cache_metadata_table_model()
+        except Exception as e:
+            raise ValueError(
+                f"Failed to read TsFile metadata. "
+                f"Please ensure the TsFile is valid and readable. Error: {e}"
+            )
+
+    def _cache_metadata_table_model(self):
+        """
+        Cache metadata using table model query via Arrow batch API.
+
+        Unified logic for tables with or without TAG columns.
+        """
+        table_schemas = self._reader.get_all_table_schemas()
+        if not table_schemas:
+            raise ValueError("No tables found in TsFile")
+
+        self.series_paths = []
+        table_names = list(table_schemas.keys())
+
+        # Progress tracking
+        total_rows = 0
+
+        for ti, table_name in enumerate(table_names):
+            table_schema = self._reader.get_table_schema(table_name)
+
+            tag_columns = []
+            field_columns = []
+            for col_schema in table_schema.get_columns():
+                col_name = col_schema.get_column_name()
+                col_category = col_schema.get_category()
+                if col_name.lower() == 'time':
+                    continue
+                if col_category == ColumnCategory.TAG:
+                    tag_columns.append(col_name)
+                elif col_category == ColumnCategory.FIELD:
+                    field_columns.append(col_name)
+
+            if not field_columns:
+                continue
+
+            # Query TAG columns + first FIELD column to discover groups and 
timestamps
+            query_cols = tag_columns + [field_columns[0]]
+
+            time_arrays = []
+            tag_arrays = {tc: [] for tc in tag_columns}
+
+            with self._reader.query_table_batch(
+                table_name, query_cols, batch_size=65536
+            ) as rs:
+                while True:
+                    arrow_table = rs.read_arrow_batch()
+                    if arrow_table is None:
+                        break
+                    batch_rows = arrow_table.num_rows
+                    total_rows += batch_rows
+                    time_arrays.append(arrow_table.column('time').to_numpy())
+                    for tc in tag_columns:
+                        
tag_arrays[tc].append(arrow_table.column(tc).to_numpy())
+
+                    if self.show_progress:
+                        sys.stderr.write(
+                            f"\rReading TsFile metadata: "
+                            f"table {ti + 1}/{len(table_names)} "
+                            f"[{table_name}] "
+                            f"({total_rows:,} rows)"
+                        )
+                        sys.stderr.flush()
+
+            if not time_arrays:
+                continue
+
+            all_times = np.concatenate(time_arrays).astype(np.int64)
+
+            if tag_columns:
+                # Merge tag columns and group by unique tag combinations
+                all_tags = {tc: np.concatenate(tag_arrays[tc]) for tc in 
tag_columns}
+
+                # Build a composite key for grouping
+                if len(tag_columns) == 1:
+                    tag_key = all_tags[tag_columns[0]]
+                    unique_keys = np.unique(tag_key)
+                    for uk in unique_keys:
+                        mask = tag_key == uk
+                        tag_values = (uk,) if not isinstance(uk, tuple) else uk
+                        self._register_tag_group(
+                            table_name, tag_columns, tag_values,
+                            field_columns, all_times[mask]
+                        )
+                else:
+                    # Multiple tag columns: use structured approach
+                    # Convert to list of tuples for grouping
+                    n = len(all_times)
+                    tag_tuples = [
+                        tuple(all_tags[tc][i] for tc in tag_columns)
+                        for i in range(n)
+                    ]
+                    unique_tuples = list(dict.fromkeys(tag_tuples))
+                    for ut in unique_tuples:
+                        mask = np.array([t == ut for t in tag_tuples], 
dtype=bool)
+                        self._register_tag_group(
+                            table_name, tag_columns, ut,
+                            field_columns, all_times[mask]
+                        )
+            else:
+                # No TAG columns: single group
+                self._register_tag_group(
+                    table_name, tag_columns, (),
+                    field_columns, all_times
+                )
+
+        if self.show_progress and total_rows > 0:
+            sys.stderr.write(
+                f"\rReading TsFile metadata: "
+                f"{len(table_names)} table(s), "
+                f"{total_rows:,} rows, "
+                f"{len(self.series_paths)} series "
+                f"... done\n"
+            )
+            sys.stderr.flush()
+
+        if not self.series_paths:
+            raise ValueError("No valid numeric series found in TsFile")
+
+    def _register_tag_group(
+        self, table_name: str, tag_columns: List[str],
+        tag_values: tuple, field_columns: List[str], timestamps: np.ndarray
+    ):
+        """Register all field series for a given table + tag group."""
+        timestamps = np.sort(timestamps)
+
+        if len(timestamps) == 0:
+            return
+
+        if tag_columns:
+            tag_part = ".".join(str(v) for v in tag_values)
+        else:
+            tag_part = ""
+
+        tag_values_dict = dict(zip(tag_columns, tag_values)) if tag_columns 
else {}
+
+        for field_col in field_columns:
+            if tag_part:
+                series_path = f"{table_name}.{tag_part}.{field_col}"
+            else:
+                series_path = f"{table_name}.{field_col}"
+
+            self.series_paths.append(series_path)
+            self._timestamps_cache[series_path] = timestamps
+            self.series_info[series_path] = {
+                'length': len(timestamps),
+                'min_time': int(timestamps[0]),
+                'max_time': int(timestamps[-1]),
+                'table_name': table_name,
+                'column_name': field_col,
+                'tag_columns': tag_columns,
+                'tag_values': tag_values_dict,
+            }
+
+    def get_all_series(self) -> List[str]:
+        """Return a list of all discovered series paths."""
+        return self.series_paths.copy()
+
+    def get_series_length(self, series_path: str) -> int:
+        """Return the number of data points for a series."""
+        if series_path not in self.series_info:
+            raise ValueError(f"Series not found: {series_path}")
+        return self.series_info[series_path]['length']
+
+    def read_series(self, series_path: str) -> List[float]:
+        """Read all data points for a series.
+
+        Args:
+            series_path: Time series path.
+
+        Returns:
+            List of data points.
+        """
+        if series_path not in self.series_info:
+            raise ValueError(f"Series not found: {series_path}")
+        if series_path in self._series_data_cache:
+            return self._series_data_cache[series_path].tolist()
+        length = self.series_info[series_path]['length']
+        return self.read_series_range(series_path, 0, length)
+
+    def read_series_range(self, series_path: str, start: int, end: int) -> 
List[float]:
+        """Read specified range of time series by row index.
+
+        Args:
+            series_path: Time series path.
+            start: Start index (inclusive).
+            end: End index (exclusive).
+
+        Returns:
+            List of data points.
+        """
+        if series_path not in self.series_info:
+            raise ValueError(f"Series not found: {series_path}")
+
+        if series_path in self._series_data_cache:
+            return self._series_data_cache[series_path][start:end].tolist()
+
+        info = self.series_info[series_path]
+        timestamps = self._timestamps_cache[series_path]
+

Review Comment:
   `read_series_range()` indexes `timestamps[start]` and `timestamps[end - 1]` 
without validating `start/end` (including `start == end`, negative indices, or 
`end > length`). This currently raises confusing IndexErrors and makes empty 
slices impossible. Add explicit range validation and return an empty result for 
`start >= end` to match the docstring’s [start, end) semantics.
   ```suggestion
           if start < 0 or end < 0:
               raise ValueError("start and end indices must be non-negative")
   
           # Handle cached series data first, using [start, end) semantics
           if series_path in self._series_data_cache:
               data = self._series_data_cache[series_path]
               length = len(data)
   
               if start >= length:
                   return []
               if end > length:
                   end = length
               if start >= end:
                   return []
   
               return data[start:end].tolist()
   
           info = self.series_info[series_path]
           timestamps = self._timestamps_cache[series_path]
           length = len(timestamps)
   
           if start >= length:
               return []
           if end > length:
               end = length
           if start >= end:
               return []
   ```



##########
python/tsfile/tsfile_series_reader.py:
##########
@@ -0,0 +1,440 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""
+High-performance TsFile Series Reader
+
+Optimized for time series data reading using the Arrow columnar API.
+Wraps TsFileReaderPy (Cython) and provides series-level metadata
+discovery, timestamp caching, and batch reads with TAG filtering.
+"""
+
+import os
+import sys
+from typing import List, Dict, Optional, Tuple
+
+import numpy as np
+import pyarrow.compute as pc
+
+from .constants import ColumnCategory
+from .tsfile_reader import TsFileReaderPy
+
+
+class TsFileSeriesReader:
+    """
+    TsFile Series Reader
+
+    Wrapper around the Cython TsFileReaderPy for reading TsFile data
+    at the series level. Supports TAG columns: a time series is uniquely
+    identified by Table + Tag values + Field column, producing series
+    paths like "weather.beijing.humidity".
+    """
+
+    def __init__(self, file_path: str, show_progress: bool = True):
+        if not os.path.exists(file_path):
+            raise FileNotFoundError(f"TsFile not found: {file_path}")
+
+        self.file_path = file_path
+        self.show_progress = show_progress
+
+        try:
+            self._reader = TsFileReaderPy(file_path)
+        except Exception as e:
+            raise ValueError(f"Failed to open TsFile: {e}")
+
+        self.series_paths: List[str] = []
+        self.series_info: Dict[str, dict] = {}
+        self._timestamps_cache: Dict[str, np.ndarray] = {}
+        self._series_data_cache: Dict[str, np.ndarray] = {}
+
+        self._cache_metadata()
+
+    def __del__(self):
+        self.close()
+
+    def close(self):
+        """Close the underlying Cython reader."""
+        if hasattr(self, '_reader'):
+            try:
+                self._reader.close()
+            except Exception:
+                pass
+
+    def _cache_metadata(self):
+        """Cache metadata from the TsFile."""
+        try:
+            self._cache_metadata_table_model()
+        except Exception as e:
+            raise ValueError(
+                f"Failed to read TsFile metadata. "
+                f"Please ensure the TsFile is valid and readable. Error: {e}"
+            )
+
+    def _cache_metadata_table_model(self):
+        """
+        Cache metadata using table model query via Arrow batch API.
+
+        Unified logic for tables with or without TAG columns.
+        """
+        table_schemas = self._reader.get_all_table_schemas()
+        if not table_schemas:
+            raise ValueError("No tables found in TsFile")
+
+        self.series_paths = []
+        table_names = list(table_schemas.keys())
+
+        # Progress tracking
+        total_rows = 0
+
+        for ti, table_name in enumerate(table_names):
+            table_schema = self._reader.get_table_schema(table_name)
+
+            tag_columns = []
+            field_columns = []
+            for col_schema in table_schema.get_columns():
+                col_name = col_schema.get_column_name()
+                col_category = col_schema.get_category()
+                if col_name.lower() == 'time':
+                    continue
+                if col_category == ColumnCategory.TAG:
+                    tag_columns.append(col_name)
+                elif col_category == ColumnCategory.FIELD:
+                    field_columns.append(col_name)
+
+            if not field_columns:
+                continue
+
+            # Query TAG columns + first FIELD column to discover groups and 
timestamps
+            query_cols = tag_columns + [field_columns[0]]
+
+            time_arrays = []
+            tag_arrays = {tc: [] for tc in tag_columns}
+
+            with self._reader.query_table_batch(
+                table_name, query_cols, batch_size=65536
+            ) as rs:
+                while True:
+                    arrow_table = rs.read_arrow_batch()
+                    if arrow_table is None:
+                        break
+                    batch_rows = arrow_table.num_rows
+                    total_rows += batch_rows
+                    time_arrays.append(arrow_table.column('time').to_numpy())
+                    for tc in tag_columns:
+                        
tag_arrays[tc].append(arrow_table.column(tc).to_numpy())
+
+                    if self.show_progress:
+                        sys.stderr.write(
+                            f"\rReading TsFile metadata: "
+                            f"table {ti + 1}/{len(table_names)} "
+                            f"[{table_name}] "
+                            f"({total_rows:,} rows)"
+                        )
+                        sys.stderr.flush()
+
+            if not time_arrays:
+                continue
+
+            all_times = np.concatenate(time_arrays).astype(np.int64)
+
+            if tag_columns:
+                # Merge tag columns and group by unique tag combinations
+                all_tags = {tc: np.concatenate(tag_arrays[tc]) for tc in 
tag_columns}
+
+                # Build a composite key for grouping
+                if len(tag_columns) == 1:
+                    tag_key = all_tags[tag_columns[0]]
+                    unique_keys = np.unique(tag_key)
+                    for uk in unique_keys:
+                        mask = tag_key == uk
+                        tag_values = (uk,) if not isinstance(uk, tuple) else uk
+                        self._register_tag_group(
+                            table_name, tag_columns, tag_values,
+                            field_columns, all_times[mask]
+                        )
+                else:
+                    # Multiple tag columns: use structured approach
+                    # Convert to list of tuples for grouping
+                    n = len(all_times)
+                    tag_tuples = [
+                        tuple(all_tags[tc][i] for tc in tag_columns)
+                        for i in range(n)
+                    ]
+                    unique_tuples = list(dict.fromkeys(tag_tuples))
+                    for ut in unique_tuples:
+                        mask = np.array([t == ut for t in tag_tuples], 
dtype=bool)
+                        self._register_tag_group(
+                            table_name, tag_columns, ut,
+                            field_columns, all_times[mask]
+                        )
+            else:
+                # No TAG columns: single group
+                self._register_tag_group(
+                    table_name, tag_columns, (),
+                    field_columns, all_times
+                )
+
+        if self.show_progress and total_rows > 0:
+            sys.stderr.write(
+                f"\rReading TsFile metadata: "
+                f"{len(table_names)} table(s), "
+                f"{total_rows:,} rows, "
+                f"{len(self.series_paths)} series "
+                f"... done\n"
+            )
+            sys.stderr.flush()
+
+        if not self.series_paths:
+            raise ValueError("No valid numeric series found in TsFile")
+
+    def _register_tag_group(
+        self, table_name: str, tag_columns: List[str],
+        tag_values: tuple, field_columns: List[str], timestamps: np.ndarray
+    ):
+        """Register all field series for a given table + tag group."""
+        timestamps = np.sort(timestamps)
+
+        if len(timestamps) == 0:
+            return
+
+        if tag_columns:
+            tag_part = ".".join(str(v) for v in tag_values)
+        else:
+            tag_part = ""
+
+        tag_values_dict = dict(zip(tag_columns, tag_values)) if tag_columns 
else {}
+
+        for field_col in field_columns:
+            if tag_part:
+                series_path = f"{table_name}.{tag_part}.{field_col}"
+            else:
+                series_path = f"{table_name}.{field_col}"
+
+            self.series_paths.append(series_path)
+            self._timestamps_cache[series_path] = timestamps
+            self.series_info[series_path] = {
+                'length': len(timestamps),
+                'min_time': int(timestamps[0]),
+                'max_time': int(timestamps[-1]),
+                'table_name': table_name,
+                'column_name': field_col,
+                'tag_columns': tag_columns,
+                'tag_values': tag_values_dict,
+            }
+
+    def get_all_series(self) -> List[str]:
+        """Return a list of all discovered series paths."""
+        return self.series_paths.copy()
+
+    def get_series_length(self, series_path: str) -> int:
+        """Return the number of data points for a series."""
+        if series_path not in self.series_info:
+            raise ValueError(f"Series not found: {series_path}")
+        return self.series_info[series_path]['length']
+
+    def read_series(self, series_path: str) -> List[float]:
+        """Read all data points for a series.
+
+        Args:
+            series_path: Time series path.
+
+        Returns:
+            List of data points.
+        """
+        if series_path not in self.series_info:
+            raise ValueError(f"Series not found: {series_path}")
+        if series_path in self._series_data_cache:
+            return self._series_data_cache[series_path].tolist()
+        length = self.series_info[series_path]['length']
+        return self.read_series_range(series_path, 0, length)
+
+    def read_series_range(self, series_path: str, start: int, end: int) -> 
List[float]:
+        """Read specified range of time series by row index.
+
+        Args:
+            series_path: Time series path.
+            start: Start index (inclusive).
+            end: End index (exclusive).
+
+        Returns:
+            List of data points.
+        """
+        if series_path not in self.series_info:
+            raise ValueError(f"Series not found: {series_path}")
+
+        if series_path in self._series_data_cache:
+            return self._series_data_cache[series_path][start:end].tolist()
+
+        info = self.series_info[series_path]
+        timestamps = self._timestamps_cache[series_path]
+
+        start_time = int(timestamps[start])
+        end_time = int(timestamps[end - 1])
+
+        _, vals = self._read_arrow(
+            info['table_name'],
+            [info['column_name']],
+            info['tag_columns'],
+            info['tag_values'],
+            start_time, end_time,
+        )
+        return vals[info['column_name']].tolist()
+
+    def read_series_by_time_range(
+        self, series_path: str, start_time: int, end_time: int
+    ) -> Tuple[np.ndarray, np.ndarray]:
+        """Read data by time range directly (for loc-style queries).
+
+        Args:
+            series_path: Time series path.
+            start_time: Start timestamp (inclusive, ms).
+            end_time: End timestamp (inclusive, ms).
+
+        Returns:
+            Tuple of (timestamps_array, values_array).
+        """
+        if series_path not in self.series_info:
+            raise ValueError(f"Series not found: {series_path}")
+
+        info = self.series_info[series_path]
+        ts_arr, field_vals = self._read_arrow(
+            info['table_name'],
+            [info['column_name']],
+            info['tag_columns'],
+            info['tag_values'],
+            start_time, end_time,
+        )
+        if len(ts_arr) > 0:
+            return ts_arr, field_vals[info['column_name']]
+        return np.array([], dtype=np.int64), np.array([], dtype=np.float64)
+
+    def read_multi_series_by_time_range(
+        self,
+        table_name: str,
+        field_columns: List[str],
+        tag_columns: List[str],
+        tag_values: Dict[str, str],
+        start_time: int,
+        end_time: int,
+    ) -> Tuple[np.ndarray, Dict[str, np.ndarray]]:
+        """Read multiple field columns from the same table+tag group in one 
query.
+
+        Args:
+            table_name: TsFile table name.
+            field_columns: List of field column names to read.
+            tag_columns: List of tag column names.
+            tag_values: Dict of tag column name to tag value.
+            start_time: Start timestamp (inclusive, ms).
+            end_time: End timestamp (inclusive, ms).
+
+        Returns:
+            (timestamps_array, {field_name: values_array}).
+        """
+        return self._read_arrow(
+            table_name, field_columns, tag_columns, tag_values,
+            start_time, end_time,
+        )
+
+    def _read_arrow(
+        self,
+        table_name: str,
+        field_columns: List[str],
+        tag_columns: List[str],
+        tag_values: Dict[str, str],
+        start_time: int,
+        end_time: int,
+    ) -> Tuple[np.ndarray, Dict[str, np.ndarray]]:
+        """Core Arrow batch reader.
+
+        Read one or more field columns from a single table+tag group
+        via query_table_batch + read_arrow_batch.
+
+        Args:
+            table_name: TsFile table name.
+            field_columns: Field columns to read.
+            tag_columns: Tag column names (for query).
+            tag_values: Tag filter values.
+            start_time: Start timestamp (inclusive, ms).
+            end_time: End timestamp (inclusive, ms).
+
+        Returns:
+            (timestamps_array, {field_name: values_array}).
+        """
+        if tag_columns:
+            query_cols = tag_columns + field_columns
+        else:
+            query_cols = list(field_columns)
+
+        ts_list = []
+        field_lists = {fc: [] for fc in field_columns}
+
+        with self._reader.query_table_batch(
+            table_name, query_cols,
+            start_time=start_time, end_time=end_time, batch_size=65536
+        ) as rs:
+            while True:
+                arrow_table = rs.read_arrow_batch()
+                if arrow_table is None:
+                    break
+
+                if tag_values:
+                    mask = None
+                    for tag_col, tag_val in tag_values.items():
+                        col_mask = pc.equal(arrow_table.column(tag_col), 
tag_val)
+                        mask = col_mask if mask is None else pc.and_(mask, 
col_mask)
+                    arrow_table = arrow_table.filter(mask)
+
+                if arrow_table.num_rows > 0:
+                    ts_list.append(arrow_table.column('time').to_numpy())
+                    for fc in field_columns:
+                        field_lists[fc].append(
+                            
arrow_table.column(fc).to_numpy().astype(np.float64)
+                        )

Review Comment:
   In `_read_arrow`, values are always converted via 
`to_numpy().astype(np.float64)`. This will fail for FIELD columns that are not 
numeric (e.g., STRING/TEXT/BLOB/DATE) and can also fail when the Arrow array 
contains nulls (writing code allows NaN values, which become nulls on read). 
Consider filtering `field_columns` up-front to numeric TSDataType only, and/or 
using Arrow casting + null-to-NaN handling before converting to NumPy (and 
keeping non-numeric columns out of this reader).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to