Copilot commented on code in PR #765: URL: https://github.com/apache/tsfile/pull/765#discussion_r3028204703
########## python/tsfile/tsfile_series_reader.py: ########## @@ -0,0 +1,440 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +""" +High-performance TsFile Series Reader + +Optimized for time series data reading using the Arrow columnar API. +Wraps TsFileReaderPy (Cython) and provides series-level metadata +discovery, timestamp caching, and batch reads with TAG filtering. +""" + +import os +import sys +from typing import List, Dict, Optional, Tuple + +import numpy as np +import pyarrow.compute as pc + +from .constants import ColumnCategory +from .tsfile_reader import TsFileReaderPy + + +class TsFileSeriesReader: + """ + TsFile Series Reader + + Wrapper around the Cython TsFileReaderPy for reading TsFile data + at the series level. Supports TAG columns: a time series is uniquely + identified by Table + Tag values + Field column, producing series + paths like "weather.beijing.humidity". + """ + + def __init__(self, file_path: str, show_progress: bool = True): + if not os.path.exists(file_path): + raise FileNotFoundError(f"TsFile not found: {file_path}") + + self.file_path = file_path + self.show_progress = show_progress + + try: + self._reader = TsFileReaderPy(file_path) + except Exception as e: + raise ValueError(f"Failed to open TsFile: {e}") Review Comment: When rethrowing the exception from `TsFileReaderPy(file_path)`, the original traceback is lost. Prefer exception chaining (`raise ValueError(...) from e`) so callers can debug the underlying failure more easily. ########## python/tsfile/tsfile_dataframe.py: ########## @@ -0,0 +1,859 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +""" +TsFileDataFrame - Lazy-loaded unified view over multiple TsFile files. + +Provides array-like and DataFrame-like access to time series data +stored in TsFile format. Supports: +1. Pre-training: random index access with array-like slicing +2. Post-training: time-aligned multi-series queries via .loc +""" + +import os +import sys +from collections import defaultdict +from typing import List, Dict, Union, Optional, Tuple +from datetime import datetime + +import numpy as np + + +def _format_timestamp(ts_ms: int) -> str: + """Convert millisecond timestamp to human-readable string.""" + try: + return datetime.fromtimestamp(ts_ms / 1000).strftime('%Y-%m-%d %H:%M:%S') + except (OSError, ValueError): + return str(ts_ms) + + +class AlignedTimeseries: + """ + Time-aligned multi-series query result with timestamps. + + Returned by .loc[...] and df[slice/list]. Supports: + - result.timestamps -> np.ndarray of ms timestamps + - result.values -> np.ndarray of shape (rows, cols) + - result.series_names -> list of series name strings + - result[i] / result[i, j] -> index into values + - print(result) -> truncated table (20 rows) + - result.show() -> full table (no truncation) + - result.show(50) -> show up to 50 rows + """ Review Comment: Docstring says `AlignedTimeseries` is "Returned by ... df[slice/list]", but `TsFileDataFrame.__getitem__` for slice/list returns a subset `TsFileDataFrame`, not an `AlignedTimeseries`. Please update the docstring to match actual behavior (or implement the documented behavior). ########## python/tsfile/tsfile_series_reader.py: ########## @@ -0,0 +1,440 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +""" +High-performance TsFile Series Reader + +Optimized for time series data reading using the Arrow columnar API. +Wraps TsFileReaderPy (Cython) and provides series-level metadata +discovery, timestamp caching, and batch reads with TAG filtering. +""" + +import os +import sys +from typing import List, Dict, Optional, Tuple + +import numpy as np +import pyarrow.compute as pc + +from .constants import ColumnCategory +from .tsfile_reader import TsFileReaderPy + + +class TsFileSeriesReader: + """ + TsFile Series Reader + + Wrapper around the Cython TsFileReaderPy for reading TsFile data + at the series level. Supports TAG columns: a time series is uniquely + identified by Table + Tag values + Field column, producing series + paths like "weather.beijing.humidity". + """ + + def __init__(self, file_path: str, show_progress: bool = True): + if not os.path.exists(file_path): + raise FileNotFoundError(f"TsFile not found: {file_path}") + + self.file_path = file_path + self.show_progress = show_progress + + try: + self._reader = TsFileReaderPy(file_path) + except Exception as e: + raise ValueError(f"Failed to open TsFile: {e}") + + self.series_paths: List[str] = [] + self.series_info: Dict[str, dict] = {} + self._timestamps_cache: Dict[str, np.ndarray] = {} + self._series_data_cache: Dict[str, np.ndarray] = {} + + self._cache_metadata() + + def __del__(self): + self.close() + + def close(self): + """Close the underlying Cython reader.""" + if hasattr(self, '_reader'): + try: + self._reader.close() + except Exception: + pass + + def _cache_metadata(self): + """Cache metadata from the TsFile.""" + try: + self._cache_metadata_table_model() + except Exception as e: + raise ValueError( + f"Failed to read TsFile metadata. " + f"Please ensure the TsFile is valid and readable. Error: {e}" + ) + + def _cache_metadata_table_model(self): + """ + Cache metadata using table model query via Arrow batch API. + + Unified logic for tables with or without TAG columns. + """ + table_schemas = self._reader.get_all_table_schemas() + if not table_schemas: + raise ValueError("No tables found in TsFile") + + self.series_paths = [] + table_names = list(table_schemas.keys()) + + # Progress tracking + total_rows = 0 + + for ti, table_name in enumerate(table_names): + table_schema = self._reader.get_table_schema(table_name) + + tag_columns = [] + field_columns = [] + for col_schema in table_schema.get_columns(): + col_name = col_schema.get_column_name() + col_category = col_schema.get_category() + if col_name.lower() == 'time': + continue + if col_category == ColumnCategory.TAG: + tag_columns.append(col_name) + elif col_category == ColumnCategory.FIELD: + field_columns.append(col_name) + + if not field_columns: + continue + + # Query TAG columns + first FIELD column to discover groups and timestamps + query_cols = tag_columns + [field_columns[0]] + + time_arrays = [] + tag_arrays = {tc: [] for tc in tag_columns} + + with self._reader.query_table_batch( + table_name, query_cols, batch_size=65536 + ) as rs: + while True: + arrow_table = rs.read_arrow_batch() + if arrow_table is None: + break + batch_rows = arrow_table.num_rows + total_rows += batch_rows + time_arrays.append(arrow_table.column('time').to_numpy()) + for tc in tag_columns: + tag_arrays[tc].append(arrow_table.column(tc).to_numpy()) + + if self.show_progress: + sys.stderr.write( + f"\rReading TsFile metadata: " + f"table {ti + 1}/{len(table_names)} " + f"[{table_name}] " + f"({total_rows:,} rows)" + ) + sys.stderr.flush() + + if not time_arrays: + continue + + all_times = np.concatenate(time_arrays).astype(np.int64) + + if tag_columns: + # Merge tag columns and group by unique tag combinations + all_tags = {tc: np.concatenate(tag_arrays[tc]) for tc in tag_columns} + + # Build a composite key for grouping + if len(tag_columns) == 1: + tag_key = all_tags[tag_columns[0]] + unique_keys = np.unique(tag_key) + for uk in unique_keys: + mask = tag_key == uk + tag_values = (uk,) if not isinstance(uk, tuple) else uk + self._register_tag_group( + table_name, tag_columns, tag_values, + field_columns, all_times[mask] + ) + else: + # Multiple tag columns: use structured approach + # Convert to list of tuples for grouping + n = len(all_times) + tag_tuples = [ + tuple(all_tags[tc][i] for tc in tag_columns) + for i in range(n) + ] + unique_tuples = list(dict.fromkeys(tag_tuples)) + for ut in unique_tuples: + mask = np.array([t == ut for t in tag_tuples], dtype=bool) + self._register_tag_group( + table_name, tag_columns, ut, + field_columns, all_times[mask] + ) + else: + # No TAG columns: single group + self._register_tag_group( + table_name, tag_columns, (), + field_columns, all_times + ) + + if self.show_progress and total_rows > 0: + sys.stderr.write( + f"\rReading TsFile metadata: " + f"{len(table_names)} table(s), " + f"{total_rows:,} rows, " + f"{len(self.series_paths)} series " + f"... done\n" + ) + sys.stderr.flush() + + if not self.series_paths: + raise ValueError("No valid numeric series found in TsFile") + + def _register_tag_group( + self, table_name: str, tag_columns: List[str], + tag_values: tuple, field_columns: List[str], timestamps: np.ndarray + ): + """Register all field series for a given table + tag group.""" + timestamps = np.sort(timestamps) + + if len(timestamps) == 0: + return + + if tag_columns: + tag_part = ".".join(str(v) for v in tag_values) + else: + tag_part = "" + + tag_values_dict = dict(zip(tag_columns, tag_values)) if tag_columns else {} + + for field_col in field_columns: + if tag_part: + series_path = f"{table_name}.{tag_part}.{field_col}" + else: + series_path = f"{table_name}.{field_col}" + + self.series_paths.append(series_path) + self._timestamps_cache[series_path] = timestamps + self.series_info[series_path] = { + 'length': len(timestamps), + 'min_time': int(timestamps[0]), + 'max_time': int(timestamps[-1]), + 'table_name': table_name, + 'column_name': field_col, + 'tag_columns': tag_columns, + 'tag_values': tag_values_dict, + } + + def get_all_series(self) -> List[str]: + """Return a list of all discovered series paths.""" + return self.series_paths.copy() + + def get_series_length(self, series_path: str) -> int: + """Return the number of data points for a series.""" + if series_path not in self.series_info: + raise ValueError(f"Series not found: {series_path}") + return self.series_info[series_path]['length'] + + def read_series(self, series_path: str) -> List[float]: + """Read all data points for a series. + + Args: + series_path: Time series path. + + Returns: + List of data points. + """ + if series_path not in self.series_info: + raise ValueError(f"Series not found: {series_path}") + if series_path in self._series_data_cache: + return self._series_data_cache[series_path].tolist() + length = self.series_info[series_path]['length'] + return self.read_series_range(series_path, 0, length) + + def read_series_range(self, series_path: str, start: int, end: int) -> List[float]: + """Read specified range of time series by row index. + + Args: + series_path: Time series path. + start: Start index (inclusive). + end: End index (exclusive). + + Returns: + List of data points. + """ + if series_path not in self.series_info: + raise ValueError(f"Series not found: {series_path}") + + if series_path in self._series_data_cache: + return self._series_data_cache[series_path][start:end].tolist() + + info = self.series_info[series_path] + timestamps = self._timestamps_cache[series_path] + + start_time = int(timestamps[start]) + end_time = int(timestamps[end - 1]) + Review Comment: `read_series_range()` assumes `start < end` and will raise or return incorrect results when `start == end` (it computes `end_time = timestamps[end - 1]`). Consider explicitly handling empty ranges (return `[]`) and validating bounds early with a clear error (e.g., `start < 0`, `end > len`, `start > end`). ########## python/tsfile/tsfile_dataframe.py: ########## @@ -0,0 +1,859 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +""" +TsFileDataFrame - Lazy-loaded unified view over multiple TsFile files. + +Provides array-like and DataFrame-like access to time series data +stored in TsFile format. Supports: +1. Pre-training: random index access with array-like slicing +2. Post-training: time-aligned multi-series queries via .loc +""" + +import os +import sys +from collections import defaultdict +from typing import List, Dict, Union, Optional, Tuple +from datetime import datetime + +import numpy as np + + +def _format_timestamp(ts_ms: int) -> str: + """Convert millisecond timestamp to human-readable string.""" + try: + return datetime.fromtimestamp(ts_ms / 1000).strftime('%Y-%m-%d %H:%M:%S') + except (OSError, ValueError): + return str(ts_ms) + + +class AlignedTimeseries: + """ + Time-aligned multi-series query result with timestamps. + + Returned by .loc[...] and df[slice/list]. Supports: + - result.timestamps -> np.ndarray of ms timestamps + - result.values -> np.ndarray of shape (rows, cols) + - result.series_names -> list of series name strings + - result[i] / result[i, j] -> index into values + - print(result) -> truncated table (20 rows) + - result.show() -> full table (no truncation) + - result.show(50) -> show up to 50 rows + """ + + def __init__(self, timestamps: np.ndarray, values: np.ndarray, series_names: List[str]): + self.timestamps = timestamps + self.values = values + self.series_names = series_names + + @property + def shape(self): + return self.values.shape + + def __len__(self): + return len(self.timestamps) + + def __getitem__(self, key): + return self.values[key] + + def _build_display(self): + """Pre-compute string representations for display.""" + n_rows, n_cols = self.values.shape + ts_strs = [_format_timestamp(int(t)) for t in self.timestamps] + ts_width = max((len(s) for s in ts_strs), default=0) + ts_width = max(ts_width, len('timestamp')) + + col_widths = [] + val_strs = [] + for col_idx in range(n_cols): + col_name = self.series_names[col_idx] if col_idx < len(self.series_names) else f'col_{col_idx}' + w = len(col_name) + col_vals = [] + for row_idx in range(n_rows): + v = self.values[row_idx, col_idx] + s = 'NaN' if np.isnan(v) else f'{v:.2f}' + col_vals.append(s) + w = max(w, len(s)) + val_strs.append(col_vals) + col_widths.append(w) + + return ts_strs, ts_width, col_widths, val_strs + + def _format_rows(self, ts_strs, ts_width, col_widths, val_strs, max_rows): + """Format rows with optional truncation.""" + n_rows = len(ts_strs) + n_cols = len(col_widths) + + header_parts = ['timestamp'.rjust(ts_width)] + for col_idx in range(n_cols): + col_name = self.series_names[col_idx] if col_idx < len(self.series_names) else f'col_{col_idx}' + header_parts.append(col_name.rjust(col_widths[col_idx])) + lines = [' '.join(header_parts)] + + if max_rows is None or n_rows <= max_rows: + show_rows = list(range(n_rows)) + else: + show_rows = list(range(max_rows)) + + for row_idx in show_rows: + parts = [ts_strs[row_idx].rjust(ts_width)] + for col_idx in range(n_cols): + parts.append(val_strs[col_idx][row_idx].rjust(col_widths[col_idx])) + lines.append(' '.join(parts)) + + return f"AlignedTimeseries({n_rows} rows, {n_cols} series)\n" + '\n'.join(lines) + + def __repr__(self): + n_rows, n_cols = self.values.shape + if n_rows == 0: + return f"AlignedTimeseries(0 rows, {n_cols} series)" + ts_strs, ts_width, col_widths, val_strs = self._build_display() + return self._format_rows(ts_strs, ts_width, col_widths, val_strs, max_rows=20) + + def show(self, max_rows: Optional[int] = None): + """Print formatted table with configurable row limit. + + Args: + max_rows: Maximum rows to display. None for all rows. + """ + n_rows, n_cols = self.values.shape + if n_rows == 0: + print(f"AlignedTimeseries(0 rows, {n_cols} series)") + return + ts_strs, ts_width, col_widths, val_strs = self._build_display() + print(self._format_rows(ts_strs, ts_width, col_widths, val_strs, max_rows)) + + +class Timeseries: + """ + Single time series abstraction. + + Supports row-based slicing (converting to time-range queries internally), + stats access, and length queries. + + When a series spans multiple files, data is merged transparently. + """ + + def __init__(self, name: str, readers_and_infos: list, merged_timestamps: np.ndarray): + """ + Args: + name: Full series path (e.g., "weather.Beijing.humidity"). + readers_and_infos: List of (reader, series_info) tuples for this series. + merged_timestamps: Sorted, deduplicated timestamp array across all files. + """ + self._name = name + self._readers_and_infos = readers_and_infos + self._timestamps = merged_timestamps + + @property + def name(self) -> str: + return self._name + + @property + def timestamps(self) -> np.ndarray: + return self._timestamps + + @property + def stats(self) -> dict: + count = len(self._timestamps) + if count == 0: + return {'start_time': None, 'end_time': None, 'count': 0} + return { + 'start_time': int(self._timestamps[0]), + 'end_time': int(self._timestamps[-1]), + 'count': count, + } + + def __len__(self) -> int: + return len(self._timestamps) + + def __getitem__(self, key): + """Row-based access. + + - series[20] -> single float value + - series[20:100] -> np.ndarray of values + """ + length = len(self._timestamps) + + if isinstance(key, int): + if key < 0: + key = length + key + if key < 0 or key >= length: + raise IndexError(f"Index {key} out of range [0, {length})") + ts = int(self._timestamps[key]) + _, vals = self._query_time_range(ts, ts) + return float(vals[0]) if len(vals) > 0 else None + + elif isinstance(key, slice): + start, stop, step = key.indices(length) + if start >= stop: + return np.array([], dtype=np.float64) + + # Get exact timestamps for the requested rows + requested_ts = self._timestamps[start:stop] + if len(requested_ts) == 0: + return np.array([], dtype=np.float64) + + # Query by time range, then filter to exact timestamps + start_ts = int(requested_ts[0]) + end_ts = int(requested_ts[-1]) + ts_arr, vals = self._query_time_range(start_ts, end_ts) + + # Vectorized alignment: both ts_arr and requested_ts are sorted + result = np.full(len(requested_ts), np.nan) + if len(ts_arr) > 0: + indices = np.searchsorted(ts_arr, requested_ts) + valid = (indices < len(ts_arr)) & (ts_arr[np.minimum(indices, len(ts_arr) - 1)] == requested_ts) + result[valid] = vals[indices[valid]] + vals = result + + if step != 1: + vals = vals[::step] + return vals + + else: + raise TypeError(f"Unsupported key type: {type(key)}") + + def _query_time_range(self, start_time: int, end_time: int) -> Tuple[np.ndarray, np.ndarray]: + """Query all readers for this series in the given time range, merge results.""" + all_ts = [] + all_vals = [] + for reader, info in self._readers_and_infos: + # Skip reader if its data doesn't overlap + if info['max_time'] < start_time or info['min_time'] > end_time: + continue + ts_arr, val_arr = reader.read_series_by_time_range( + self._name, start_time, end_time + ) + if len(ts_arr) > 0: + all_ts.append(ts_arr) + all_vals.append(val_arr) + + if not all_ts: + return np.array([], dtype=np.int64), np.array([], dtype=np.float64) + + if len(all_ts) == 1: + return all_ts[0], all_vals[0] + + # Merge from multiple readers, sort by timestamp, deduplicate + merged_ts = np.concatenate(all_ts) + merged_vals = np.concatenate(all_vals) + sort_idx = np.argsort(merged_ts, kind='mergesort') + merged_ts = merged_ts[sort_idx] + merged_vals = merged_vals[sort_idx] + + # Deduplicate by timestamp (keep first occurrence) + _, unique_idx = np.unique(merged_ts, return_index=True) + return merged_ts[unique_idx], merged_vals[unique_idx] + + def __repr__(self): + stats = self.stats + if stats['count'] == 0: + return f"Timeseries('{self._name}', count=0)" + return ( + f"Timeseries('{self._name}', count={stats['count']}, " + f"start={_format_timestamp(stats['start_time'])}, " + f"end={_format_timestamp(stats['end_time'])})" + ) + + +class _LocIndexer: + """ + Implements .loc[start_time:end_time, series_list] for time-aligned queries. + + Returns AlignedTimeseries with timestamps, values, and series names. + """ + + def __init__(self, dataframe: 'TsFileDataFrame'): + self._df = dataframe + + def _parse_key(self, key): + """Parse key into (start_time, end_time, series_names).""" + if not isinstance(key, tuple) or len(key) != 2: + raise ValueError( + "loc requires exactly 2 arguments: " + "tsdf.loc[start_time:end_time, series_list]" + ) + + time_slice, series_spec = key + + # Parse time range + if isinstance(time_slice, slice): + start_time = time_slice.start + end_time = time_slice.stop + if start_time is None: + start_time = np.iinfo(np.int64).min + if end_time is None: + end_time = np.iinfo(np.int64).max + elif isinstance(time_slice, (int, np.integer)): + start_time = end_time = int(time_slice) + else: + raise TypeError(f"Time index must be slice or int, got {type(time_slice)}") + + # Parse series names + if isinstance(series_spec, (str, int)): + series_spec = [series_spec] + + series_names = [] + for s in series_spec: + if isinstance(s, (int, np.integer)): + idx = int(s) + if idx < 0 or idx >= len(self._df._series_list): + raise IndexError(f"Series index {idx} out of range") + series_names.append(self._df._series_list[idx]) + elif isinstance(s, str): + if s not in self._df._series_map: + raise KeyError(f"Series not found: {s}") + series_names.append(s) + else: + raise TypeError(f"Series specifier must be int or str, got {type(s)}") + + return start_time, end_time, series_names + + def _query_aligned(self, start_time: int, end_time: int, series_names: List[str]): + """Query and align multiple series using batched Arrow reads.""" + # Group series by (reader_id, table_name, tag_tuple) for batch queries. + # Each group can be fetched with a single query_table_batch call. + groups = defaultdict(list) # key -> [(col_idx, field_name, series_name, reader, info)] + + for col_idx, name in enumerate(series_names): + entries = self._df._series_map[name] + for reader, info in entries: + if info['max_time'] < start_time or info['min_time'] > end_time: + continue + key = (id(reader), info['table_name'], + tuple(sorted(info['tag_values'].items()))) + groups[key].append((col_idx, info['column_name'], name, reader, info)) + + # Fetch data: one query per group + series_data = {} # series_name -> (ts_arr, val_arr) + + for key, entries in groups.items(): + reader = entries[0][3] + info = entries[0][4] + field_columns = list(dict.fromkeys(e[1] for e in entries)) # dedupe, keep order + + ts_arr, field_vals = reader.read_multi_series_by_time_range( + info['table_name'], field_columns, + info['tag_columns'], info['tag_values'], + start_time, end_time, + ) + + for _col_idx, field_name, name, _, _ in entries: + if name in series_data: + # Series spans multiple readers: merge + prev_ts, prev_val = series_data[name] + merged_ts = np.concatenate([prev_ts, ts_arr]) + merged_val = np.concatenate([prev_val, field_vals[field_name]]) + sort_idx = np.argsort(merged_ts, kind='mergesort') + merged_ts = merged_ts[sort_idx] + merged_val = merged_val[sort_idx] + _, unique_idx = np.unique(merged_ts, return_index=True) + series_data[name] = (merged_ts[unique_idx], merged_val[unique_idx]) + else: + series_data[name] = (ts_arr, field_vals[field_name]) + + # Collect unique timestamps using numpy + all_ts_arrays = [data[0] for data in series_data.values() if len(data[0]) > 0] + if not all_ts_arrays: + return (np.array([], dtype=np.int64), + np.array([]).reshape(0, len(series_names))) + + sorted_timestamps = np.unique(np.concatenate(all_ts_arrays)) + + # Build result matrix using np.searchsorted for vectorized alignment + result = np.full((len(sorted_timestamps), len(series_names)), np.nan) + + for col_idx, name in enumerate(series_names): + if name not in series_data: + continue + ts_arr, val_arr = series_data[name] + if len(ts_arr) == 0: + continue + indices = np.searchsorted(sorted_timestamps, ts_arr) + result[indices, col_idx] = val_arr + + return sorted_timestamps, result + + def __getitem__(self, key) -> 'AlignedTimeseries': + start_time, end_time, series_names = self._parse_key(key) + timestamps, values = self._query_aligned(start_time, end_time, series_names) + return AlignedTimeseries(timestamps, values, series_names) + + +class TsFileDataFrame: + """ + Lazy-loaded unified view over multiple TsFile files. + + Each dimension is a time series identified by Table + Tags + Field. + Supports cross-file merging of same-named series. + """ + + def __init__(self, paths: Union[str, List[str]], show_progress: bool = True): + if isinstance(paths, str): + paths = [paths] + + # Expand directories: collect all .tsfile files within each directory + expanded = [] + for p in paths: + if os.path.isdir(p): + tsfiles = sorted( + os.path.join(root, f) + for root, _, files in os.walk(p) + for f in files + if f.endswith('.tsfile') + ) + if not tsfiles: + raise FileNotFoundError( + f"No .tsfile files found in directory: {p}" + ) + expanded.extend(tsfiles) + else: + expanded.append(p) + + self._paths = [] + for p in expanded: + if not os.path.exists(p): + raise FileNotFoundError(f"TsFile not found: {p}") + self._paths.append(os.path.abspath(p)) + + self._show_progress = show_progress + self._readers = {} + self._series_list: List[str] = [] + self._series_map: Dict[str, list] = {} # series_path -> [(reader, info), ...] + self._merged_timestamps: Dict[str, np.ndarray] = {} + self._merged_info: Dict[str, dict] = {} + self._name_to_index: Dict[str, int] = {} + + self._is_view = False + self._root = None + + self._load_metadata() + + @classmethod + def _from_subset(cls, parent: 'TsFileDataFrame', series_names: List[str]) -> 'TsFileDataFrame': + """Create a lightweight view over a subset of series. + + Shares readers, series_map, merged_timestamps, and merged_info + with the root. Does NOT own any readers and will not close them. + """ + obj = object.__new__(cls) + obj._root = parent._root if parent._is_view else parent + obj._is_view = True + obj._paths = parent._paths + obj._readers = parent._readers + obj._series_map = parent._series_map + obj._merged_timestamps = parent._merged_timestamps + obj._merged_info = parent._merged_info + obj._show_progress = parent._show_progress + obj._series_list = list(series_names) + obj._name_to_index = {name: i for i, name in enumerate(series_names)} + return obj + + def _load_metadata(self): + """Load metadata from all TsFile files.""" + from .tsfile_series_reader import TsFileSeriesReader + + if len(self._paths) >= 2: + self._load_metadata_parallel(TsFileSeriesReader) + else: + self._load_metadata_serial(TsFileSeriesReader) + Review Comment: `TsFileDataFrame`, `Timeseries`, and `.loc[...]` introduce substantial new public API and non-trivial merging/alignment logic, but there are currently no pytest tests covering them (no `TsFileDataFrame` references under `python/tests`). Please add focused tests for: metadata loading from multiple shards, `__getitem__` by index/name/slice, `Timeseries` row slicing, and `.loc` time-range alignment across multiple series/files. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
