ycycse commented on code in PR #765: URL: https://github.com/apache/tsfile/pull/765#discussion_r3038744030
########## python/tsfile/tsfile_series_reader.py: ########## @@ -0,0 +1,440 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +""" +High-performance TsFile Series Reader + +Optimized for time series data reading using the Arrow columnar API. +Wraps TsFileReaderPy (Cython) and provides series-level metadata +discovery, timestamp caching, and batch reads with TAG filtering. +""" + +import os +import sys +from typing import List, Dict, Optional, Tuple + +import numpy as np +import pyarrow.compute as pc + +from .constants import ColumnCategory +from .tsfile_reader import TsFileReaderPy + + +class TsFileSeriesReader: + """ + TsFile Series Reader + + Wrapper around the Cython TsFileReaderPy for reading TsFile data + at the series level. Supports TAG columns: a time series is uniquely + identified by Table + Tag values + Field column, producing series + paths like "weather.beijing.humidity". + """ + + def __init__(self, file_path: str, show_progress: bool = True): + if not os.path.exists(file_path): + raise FileNotFoundError(f"TsFile not found: {file_path}") + + self.file_path = file_path + self.show_progress = show_progress + + try: + self._reader = TsFileReaderPy(file_path) + except Exception as e: + raise ValueError(f"Failed to open TsFile: {e}") + + self.series_paths: List[str] = [] + self.series_info: Dict[str, dict] = {} + self._timestamps_cache: Dict[str, np.ndarray] = {} + self._series_data_cache: Dict[str, np.ndarray] = {} + + self._cache_metadata() + + def __del__(self): + self.close() + + def close(self): + """Close the underlying Cython reader.""" + if hasattr(self, '_reader'): + try: + self._reader.close() + except Exception: + pass + + def _cache_metadata(self): + """Cache metadata from the TsFile.""" + try: + self._cache_metadata_table_model() + except Exception as e: + raise ValueError( + f"Failed to read TsFile metadata. " + f"Please ensure the TsFile is valid and readable. Error: {e}" + ) + + def _cache_metadata_table_model(self): + """ + Cache metadata using table model query via Arrow batch API. + + Unified logic for tables with or without TAG columns. + """ + table_schemas = self._reader.get_all_table_schemas() + if not table_schemas: + raise ValueError("No tables found in TsFile") + + self.series_paths = [] + table_names = list(table_schemas.keys()) + + # Progress tracking + total_rows = 0 + + for ti, table_name in enumerate(table_names): + table_schema = self._reader.get_table_schema(table_name) + + tag_columns = [] + field_columns = [] + for col_schema in table_schema.get_columns(): + col_name = col_schema.get_column_name() + col_category = col_schema.get_category() + if col_name.lower() == 'time': + continue + if col_category == ColumnCategory.TAG: + tag_columns.append(col_name) + elif col_category == ColumnCategory.FIELD: + field_columns.append(col_name) + + if not field_columns: + continue + + # Query TAG columns + first FIELD column to discover groups and timestamps + query_cols = tag_columns + [field_columns[0]] + + time_arrays = [] + tag_arrays = {tc: [] for tc in tag_columns} + + with self._reader.query_table_batch( + table_name, query_cols, batch_size=65536 + ) as rs: + while True: + arrow_table = rs.read_arrow_batch() + if arrow_table is None: + break + batch_rows = arrow_table.num_rows + total_rows += batch_rows + time_arrays.append(arrow_table.column('time').to_numpy()) + for tc in tag_columns: + tag_arrays[tc].append(arrow_table.column(tc).to_numpy()) + + if self.show_progress: + sys.stderr.write( + f"\rReading TsFile metadata: " + f"table {ti + 1}/{len(table_names)} " + f"[{table_name}] " + f"({total_rows:,} rows)" + ) + sys.stderr.flush() + + if not time_arrays: + continue + + all_times = np.concatenate(time_arrays).astype(np.int64) + + if tag_columns: + # Merge tag columns and group by unique tag combinations + all_tags = {tc: np.concatenate(tag_arrays[tc]) for tc in tag_columns} + + # Build a composite key for grouping + if len(tag_columns) == 1: + tag_key = all_tags[tag_columns[0]] + unique_keys = np.unique(tag_key) + for uk in unique_keys: + mask = tag_key == uk + tag_values = (uk,) if not isinstance(uk, tuple) else uk + self._register_tag_group( + table_name, tag_columns, tag_values, + field_columns, all_times[mask] + ) + else: + # Multiple tag columns: use structured approach + # Convert to list of tuples for grouping + n = len(all_times) + tag_tuples = [ + tuple(all_tags[tc][i] for tc in tag_columns) + for i in range(n) + ] + unique_tuples = list(dict.fromkeys(tag_tuples)) + for ut in unique_tuples: + mask = np.array([t == ut for t in tag_tuples], dtype=bool) + self._register_tag_group( + table_name, tag_columns, ut, + field_columns, all_times[mask] Review Comment: Done. The implementation has moved to python/tsfile/dataset/reader.py. The pure-Python tuple/mask grouping path was replaced with a vectorized NumPy grouping implementation based on structured arrays and np.unique(..., return_inverse=True). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
