ycycse commented on code in PR #765:
URL: https://github.com/apache/tsfile/pull/765#discussion_r3038774430


##########
python/tsfile/tsfile_dataframe.py:
##########
@@ -0,0 +1,859 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""
+TsFileDataFrame - Lazy-loaded unified view over multiple TsFile files.
+
+Provides array-like and DataFrame-like access to time series data
+stored in TsFile format. Supports:
+1. Pre-training: random index access with array-like slicing
+2. Post-training: time-aligned multi-series queries via .loc
+"""
+
+import os
+import sys
+from collections import defaultdict
+from typing import List, Dict, Union, Optional, Tuple
+from datetime import datetime
+
+import numpy as np
+
+
+def _format_timestamp(ts_ms: int) -> str:
+    """Convert millisecond timestamp to human-readable string."""
+    try:
+        return datetime.fromtimestamp(ts_ms / 1000).strftime('%Y-%m-%d 
%H:%M:%S')
+    except (OSError, ValueError):
+        return str(ts_ms)
+
+
+class AlignedTimeseries:
+    """
+    Time-aligned multi-series query result with timestamps.
+
+    Returned by .loc[...] and df[slice/list]. Supports:
+    - result.timestamps -> np.ndarray of ms timestamps
+    - result.values -> np.ndarray of shape (rows, cols)
+    - result.series_names -> list of series name strings
+    - result[i] / result[i, j] -> index into values
+    - print(result) -> truncated table (20 rows)
+    - result.show() -> full table (no truncation)
+    - result.show(50) -> show up to 50 rows
+    """
+
+    def __init__(self, timestamps: np.ndarray, values: np.ndarray, 
series_names: List[str]):
+        self.timestamps = timestamps
+        self.values = values
+        self.series_names = series_names
+
+    @property
+    def shape(self):
+        return self.values.shape
+
+    def __len__(self):
+        return len(self.timestamps)
+
+    def __getitem__(self, key):
+        return self.values[key]
+
+    def _build_display(self):
+        """Pre-compute string representations for display."""
+        n_rows, n_cols = self.values.shape
+        ts_strs = [_format_timestamp(int(t)) for t in self.timestamps]
+        ts_width = max((len(s) for s in ts_strs), default=0)
+        ts_width = max(ts_width, len('timestamp'))
+
+        col_widths = []
+        val_strs = []
+        for col_idx in range(n_cols):
+            col_name = self.series_names[col_idx] if col_idx < 
len(self.series_names) else f'col_{col_idx}'
+            w = len(col_name)
+            col_vals = []
+            for row_idx in range(n_rows):
+                v = self.values[row_idx, col_idx]
+                s = 'NaN' if np.isnan(v) else f'{v:.2f}'
+                col_vals.append(s)
+                w = max(w, len(s))
+            val_strs.append(col_vals)
+            col_widths.append(w)
+
+        return ts_strs, ts_width, col_widths, val_strs
+
+    def _format_rows(self, ts_strs, ts_width, col_widths, val_strs, max_rows):
+        """Format rows with optional truncation."""
+        n_rows = len(ts_strs)
+        n_cols = len(col_widths)
+
+        header_parts = ['timestamp'.rjust(ts_width)]
+        for col_idx in range(n_cols):
+            col_name = self.series_names[col_idx] if col_idx < 
len(self.series_names) else f'col_{col_idx}'
+            header_parts.append(col_name.rjust(col_widths[col_idx]))
+        lines = ['  '.join(header_parts)]
+
+        if max_rows is None or n_rows <= max_rows:
+            show_rows = list(range(n_rows))
+        else:
+            show_rows = list(range(max_rows))
+
+        for row_idx in show_rows:
+            parts = [ts_strs[row_idx].rjust(ts_width)]
+            for col_idx in range(n_cols):
+                
parts.append(val_strs[col_idx][row_idx].rjust(col_widths[col_idx]))
+            lines.append('  '.join(parts))
+
+        return f"AlignedTimeseries({n_rows} rows, {n_cols} series)\n" + 
'\n'.join(lines)
+
+    def __repr__(self):
+        n_rows, n_cols = self.values.shape
+        if n_rows == 0:
+            return f"AlignedTimeseries(0 rows, {n_cols} series)"
+        ts_strs, ts_width, col_widths, val_strs = self._build_display()
+        return self._format_rows(ts_strs, ts_width, col_widths, val_strs, 
max_rows=20)
+
+    def show(self, max_rows: Optional[int] = None):
+        """Print formatted table with configurable row limit.
+
+        Args:
+            max_rows: Maximum rows to display. None for all rows.
+        """
+        n_rows, n_cols = self.values.shape
+        if n_rows == 0:
+            print(f"AlignedTimeseries(0 rows, {n_cols} series)")
+            return
+        ts_strs, ts_width, col_widths, val_strs = self._build_display()
+        print(self._format_rows(ts_strs, ts_width, col_widths, val_strs, 
max_rows))
+
+
+class Timeseries:
+    """
+    Single time series abstraction.
+
+    Supports row-based slicing (converting to time-range queries internally),
+    stats access, and length queries.
+
+    When a series spans multiple files, data is merged transparently.
+    """
+
+    def __init__(self, name: str, readers_and_infos: list, merged_timestamps: 
np.ndarray):
+        """
+        Args:
+            name: Full series path (e.g., "weather.Beijing.humidity").
+            readers_and_infos: List of (reader, series_info) tuples for this 
series.
+            merged_timestamps: Sorted, deduplicated timestamp array across all 
files.
+        """
+        self._name = name
+        self._readers_and_infos = readers_and_infos
+        self._timestamps = merged_timestamps
+
+    @property
+    def name(self) -> str:
+        return self._name
+
+    @property
+    def timestamps(self) -> np.ndarray:
+        return self._timestamps
+
+    @property
+    def stats(self) -> dict:
+        count = len(self._timestamps)
+        if count == 0:
+            return {'start_time': None, 'end_time': None, 'count': 0}
+        return {
+            'start_time': int(self._timestamps[0]),
+            'end_time': int(self._timestamps[-1]),
+            'count': count,
+        }
+
+    def __len__(self) -> int:
+        return len(self._timestamps)
+
+    def __getitem__(self, key):
+        """Row-based access.
+
+        - series[20] -> single float value
+        - series[20:100] -> np.ndarray of values
+        """
+        length = len(self._timestamps)
+
+        if isinstance(key, int):
+            if key < 0:
+                key = length + key
+            if key < 0 or key >= length:
+                raise IndexError(f"Index {key} out of range [0, {length})")
+            ts = int(self._timestamps[key])
+            _, vals = self._query_time_range(ts, ts)
+            return float(vals[0]) if len(vals) > 0 else None
+
+        elif isinstance(key, slice):
+            start, stop, step = key.indices(length)
+            if start >= stop:
+                return np.array([], dtype=np.float64)
+
+            # Get exact timestamps for the requested rows
+            requested_ts = self._timestamps[start:stop]
+            if len(requested_ts) == 0:
+                return np.array([], dtype=np.float64)
+
+            # Query by time range, then filter to exact timestamps
+            start_ts = int(requested_ts[0])
+            end_ts = int(requested_ts[-1])
+            ts_arr, vals = self._query_time_range(start_ts, end_ts)
+
+            # Vectorized alignment: both ts_arr and requested_ts are sorted
+            result = np.full(len(requested_ts), np.nan)
+            if len(ts_arr) > 0:
+                indices = np.searchsorted(ts_arr, requested_ts)
+                valid = (indices < len(ts_arr)) & (ts_arr[np.minimum(indices, 
len(ts_arr) - 1)] == requested_ts)
+                result[valid] = vals[indices[valid]]
+            vals = result
+
+            if step != 1:
+                vals = vals[::step]
+            return vals
+
+        else:
+            raise TypeError(f"Unsupported key type: {type(key)}")
+
+    def _query_time_range(self, start_time: int, end_time: int) -> 
Tuple[np.ndarray, np.ndarray]:
+        """Query all readers for this series in the given time range, merge 
results."""
+        all_ts = []
+        all_vals = []
+        for reader, info in self._readers_and_infos:
+            # Skip reader if its data doesn't overlap
+            if info['max_time'] < start_time or info['min_time'] > end_time:
+                continue
+            ts_arr, val_arr = reader.read_series_by_time_range(
+                self._name, start_time, end_time
+            )
+            if len(ts_arr) > 0:
+                all_ts.append(ts_arr)
+                all_vals.append(val_arr)
+
+        if not all_ts:
+            return np.array([], dtype=np.int64), np.array([], dtype=np.float64)
+
+        if len(all_ts) == 1:
+            return all_ts[0], all_vals[0]
+
+        # Merge from multiple readers, sort by timestamp, deduplicate
+        merged_ts = np.concatenate(all_ts)
+        merged_vals = np.concatenate(all_vals)
+        sort_idx = np.argsort(merged_ts, kind='mergesort')
+        merged_ts = merged_ts[sort_idx]
+        merged_vals = merged_vals[sort_idx]
+
+        # Deduplicate by timestamp (keep first occurrence)
+        _, unique_idx = np.unique(merged_ts, return_index=True)
+        return merged_ts[unique_idx], merged_vals[unique_idx]
+
+    def __repr__(self):
+        stats = self.stats
+        if stats['count'] == 0:
+            return f"Timeseries('{self._name}', count=0)"
+        return (
+            f"Timeseries('{self._name}', count={stats['count']}, "
+            f"start={_format_timestamp(stats['start_time'])}, "
+            f"end={_format_timestamp(stats['end_time'])})"
+        )
+
+
+class _LocIndexer:
+    """
+    Implements .loc[start_time:end_time, series_list] for time-aligned queries.
+
+    Returns AlignedTimeseries with timestamps, values, and series names.
+    """
+
+    def __init__(self, dataframe: 'TsFileDataFrame'):
+        self._df = dataframe
+
+    def _parse_key(self, key):
+        """Parse key into (start_time, end_time, series_names)."""
+        if not isinstance(key, tuple) or len(key) != 2:
+            raise ValueError(
+                "loc requires exactly 2 arguments: "
+                "tsdf.loc[start_time:end_time, series_list]"
+            )
+
+        time_slice, series_spec = key
+
+        # Parse time range
+        if isinstance(time_slice, slice):
+            start_time = time_slice.start
+            end_time = time_slice.stop
+            if start_time is None:
+                start_time = np.iinfo(np.int64).min
+            if end_time is None:
+                end_time = np.iinfo(np.int64).max
+        elif isinstance(time_slice, (int, np.integer)):
+            start_time = end_time = int(time_slice)
+        else:
+            raise TypeError(f"Time index must be slice or int, got 
{type(time_slice)}")
+
+        # Parse series names
+        if isinstance(series_spec, (str, int)):
+            series_spec = [series_spec]
+
+        series_names = []
+        for s in series_spec:
+            if isinstance(s, (int, np.integer)):
+                idx = int(s)
+                if idx < 0 or idx >= len(self._df._series_list):
+                    raise IndexError(f"Series index {idx} out of range")
+                series_names.append(self._df._series_list[idx])
+            elif isinstance(s, str):
+                if s not in self._df._series_map:
+                    raise KeyError(f"Series not found: {s}")
+                series_names.append(s)
+            else:
+                raise TypeError(f"Series specifier must be int or str, got 
{type(s)}")
+
+        return start_time, end_time, series_names
+
+    def _query_aligned(self, start_time: int, end_time: int, series_names: 
List[str]):
+        """Query and align multiple series using batched Arrow reads."""
+        # Group series by (reader_id, table_name, tag_tuple) for batch queries.
+        # Each group can be fetched with a single query_table_batch call.
+        groups = defaultdict(list)  # key -> [(col_idx, field_name, 
series_name, reader, info)]
+
+        for col_idx, name in enumerate(series_names):
+            entries = self._df._series_map[name]
+            for reader, info in entries:
+                if info['max_time'] < start_time or info['min_time'] > 
end_time:
+                    continue
+                key = (id(reader), info['table_name'],
+                       tuple(sorted(info['tag_values'].items())))
+                groups[key].append((col_idx, info['column_name'], name, 
reader, info))
+
+        # Fetch data: one query per group
+        series_data = {}  # series_name -> (ts_arr, val_arr)
+
+        for key, entries in groups.items():
+            reader = entries[0][3]
+            info = entries[0][4]
+            field_columns = list(dict.fromkeys(e[1] for e in entries))  # 
dedupe, keep order
+
+            ts_arr, field_vals = reader.read_multi_series_by_time_range(
+                info['table_name'], field_columns,
+                info['tag_columns'], info['tag_values'],
+                start_time, end_time,
+            )
+
+            for _col_idx, field_name, name, _, _ in entries:

Review Comment:
    Obsolete after the refactor. The implementation has moved to 
python/tsfile/dataset/dataframe.py, and that old code path no longer exists.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to