This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch rc/2.2.0 in repository https://gitbox.apache.org/repos/asf/tsfile.git
commit 3cf7514ae80ee5dc644728778d1c1f94855b27d4 Author: Colin Lee <[email protected]> AuthorDate: Mon Nov 3 13:49:32 2025 +0800 support python to_dataframe. (#624) --- python/tests/test_write_and_read.py | 43 ++++++++++++++++++++++-- python/tsfile/__init__.py | 3 +- python/tsfile/exceptions.py | 31 ++++++++++++++++- python/tsfile/schema.py | 3 ++ python/tsfile/utils.py | 67 +++++++++++++++++++++++++++++++++++++ 5 files changed, 142 insertions(+), 5 deletions(-) diff --git a/python/tests/test_write_and_read.py b/python/tests/test_write_and_read.py index 78731899..e5c87ab9 100644 --- a/python/tests/test_write_and_read.py +++ b/python/tests/test_write_and_read.py @@ -18,15 +18,17 @@ import os +import numpy as np import pytest - -from tsfile import ColumnSchema, TableSchema, TSEncoding, NotSupportedError +from tsfile import ColumnSchema, TableSchema, TSEncoding +from tsfile import Compressor from tsfile import TSDataType from tsfile import Tablet, RowRecord, Field from tsfile import TimeseriesSchema from tsfile import TsFileTableWriter from tsfile import TsFileWriter, TsFileReader, ColumnCategory -from tsfile import Compressor +from tsfile import to_dataframe +from tsfile.exceptions import TableNotExistError, ColumnNotExistError, NotSupportedError def test_row_record_write_and_read(): @@ -60,6 +62,7 @@ def test_row_record_write_and_read(): if os.path.exists("record_write_and_read.tsfile"): os.remove("record_write_and_read.tsfile") + @pytest.mark.skip(reason="API not match") def test_tablet_write_and_read(): try: @@ -268,3 +271,37 @@ def test_tsfile_config(): set_tsfile_config({"float_encoding_type_": TSEncoding.BITMAP}) with pytest.raises(NotSupportedError): set_tsfile_config({"time_compress_type_": Compressor.PAA}) + + +def test_tsfile_to_df(): + table = TableSchema("test_table", + [ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG), + ColumnSchema("value", TSDataType.DOUBLE, ColumnCategory.FIELD), + ColumnSchema("value2", TSDataType.INT64, ColumnCategory.FIELD)]) + try: + with TsFileTableWriter("table_write_to_df.tsfile", table) as writer: + tablet = Tablet(["device", "value", "value2"], + [TSDataType.STRING, TSDataType.DOUBLE, TSDataType.INT64], 4097) + for i in range(4097): + tablet.add_timestamp(i, i) + tablet.add_value_by_name("device", i, "device" + str(i)) + tablet.add_value_by_index(1, i, i * 100.0) + tablet.add_value_by_index(2, i, i * 100) + writer.write_table(tablet) + df1 = to_dataframe("table_write_to_df.tsfile") + assert df1.shape == (4097, 4) + assert df1["value2"].sum() == 100 * (1 + 4096) / 2 * 4096 + assert df1["time"].dtype == np.int64 + assert df1["value"].dtype == np.float64 + assert df1["value2"].dtype == np.int64 + df2 = to_dataframe("table_write_to_df.tsfile", column_names=["device", "value2"]) + assert df2.shape == (4097, 3) + assert df1["value2"].equals(df2["value2"]) + df3 = to_dataframe("table_write_to_df.tsfile", column_names=["device", "value"], max_row_num=8000) + assert df3.shape == (4097, 3) + with pytest.raises(TableNotExistError): + to_dataframe("table_write_to_df.tsfile", "test_tb") + with pytest.raises(ColumnNotExistError): + to_dataframe("table_write_to_df.tsfile", "test_table", ["device1"]) + finally: + os.remove("table_write_to_df.tsfile") diff --git a/python/tsfile/__init__.py b/python/tsfile/__init__.py index 0c5081fa..bf755fce 100644 --- a/python/tsfile/__init__.py +++ b/python/tsfile/__init__.py @@ -33,4 +33,5 @@ from .exceptions import * from .tsfile_reader import TsFileReaderPy as TsFileReader, ResultSetPy as ResultSet from .tsfile_writer import TsFileWriterPy as TsFileWriter from .tsfile_py_cpp import get_tsfile_config, set_tsfile_config -from .tsfile_table_writer import TsFileTableWriter \ No newline at end of file +from .tsfile_table_writer import TsFileTableWriter +from .utils import to_dataframe \ No newline at end of file diff --git a/python/tsfile/exceptions.py b/python/tsfile/exceptions.py index 15575954..2a3df283 100644 --- a/python/tsfile/exceptions.py +++ b/python/tsfile/exceptions.py @@ -24,81 +24,101 @@ class LibraryError(Exception): self.code = code if code is not None else self._default_code self.message = context if context is not None else self._default_message super().__init__(f"[{code}] {self.message}") + def __str__(self): return f"{self.code}: {self.message}" + class OOMError(LibraryError): _default_message = "Out of memory" _default_code = 1 + class NotExistsError(LibraryError): _default_message = "Requested resource does not exist" _default_code = 2 + class AlreadyExistsError(LibraryError): _default_message = "Resource already exists" _default_code = 3 + class InvalidArgumentError(LibraryError): _default_message = "Invalid argument provided" _default_code = 4 + class OutOfRangeError(LibraryError): _default_message = "Value out of valid range" _default_code = 5 + class PartialReadError(LibraryError): _default_message = "Incomplete data read operation" _default_code = 6 + class FileOpenError(LibraryError): _default_message = "Failed to open file" _default_code = 28 + class FileCloseError(LibraryError): _default_message = "Failed to close file" _default_code = 29 + class FileWriteError(LibraryError): _default_message = "Failed to write to file" _default_code = 30 + class FileReadError(LibraryError): _default_message = "Failed to read from file" _default_code = 31 + class FileSyncError(LibraryError): _default_message = "Failed to sync file contents" _default_code = 32 + class MetadataError(LibraryError): _default_message = "Metadata inconsistency detected" _default_code = 33 + class BufferNotEnoughError(LibraryError): _default_message = "Insufficient buffer space" _default_code = 36 + class NotSupportedError(LibraryError): _default_message = "Not support yet" _default_code = 40 + class DeviceNotExistError(LibraryError): _default_message = "Requested device does not exist" _default_code = 44 + class MeasurementNotExistError(LibraryError): _default_message = "Specified measurement does not exist" _default_code = 45 + class InvalidQueryError(LibraryError): _default_message = "Malformed query syntax" _default_code = 46 + class CompressionError(LibraryError): _default_message = "Data compression/decompression failed" _default_code = 48 + class TableNotExistError(LibraryError): _default_message = "Requested table does not exist" _default_code = 49 @@ -108,10 +128,17 @@ class TypeNotSupportedError(LibraryError): _default_message = "Unsupported data type" _default_code = 26 + class TypeMismatchError(LibraryError): _default_message = "Data type mismatch" _default_code = 27 + +class ColumnNotExistError(LibraryError): + _default_message = "Column does not exist" + _default_code = 50 + + ERROR_MAPPING = { 1: OOMError, 2: NotExistsError, @@ -134,9 +161,11 @@ ERROR_MAPPING = { 46: InvalidQueryError, 48: CompressionError, 49: TableNotExistError, + 50: ColumnNotExistError, } -def get_exception(code : int, context : str = None): + +def get_exception(code: int, context: str = None): if code == 0: return None diff --git a/python/tsfile/schema.py b/python/tsfile/schema.py index c62b0d01..ae7960c5 100644 --- a/python/tsfile/schema.py +++ b/python/tsfile/schema.py @@ -114,6 +114,9 @@ class TableSchema: def get_columns(self): return self.columns + def get_column_names(self): + return [name.get_column_name() for name in self.columns] + def __repr__(self) -> str: return f"TableSchema({self.table_name}, {self.columns})" diff --git a/python/tsfile/utils.py b/python/tsfile/utils.py new file mode 100644 index 00000000..3d236606 --- /dev/null +++ b/python/tsfile/utils.py @@ -0,0 +1,67 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import pandas as pd + +from tsfile.exceptions import TableNotExistError, ColumnNotExistError +from tsfile.tsfile_reader import TsFileReaderPy + + +def to_dataframe(file_path: str, + table_name: str = None, + column_names: list[str] = None, + max_row_num: int = None) -> pd.DataFrame: + with TsFileReaderPy(file_path) as reader: + total_rows = 0 + table_schema = reader.get_all_table_schemas() + if len(table_schema) == 0: + raise TableNotExistError("Not found any table in the TsFile.") + if table_name is None: + # get the first table name by default + table_name, columns = next(iter(table_schema.items())) + else: + if table_name not in table_schema: + raise TableNotExistError(table_name) + columns = table_schema[table_name] + + column_names_in_file = columns.get_column_names() + + if column_names is not None: + for column in column_names: + if column not in column_names_in_file: + raise ColumnNotExistError(column) + else: + column_names = column_names_in_file + + df_list: list[pd.DataFrame] = [] + + with reader.query_table(table_name, column_names) as result: + while result.next(): + if max_row_num is not None: + remaining_rows = max_row_num - total_rows + if remaining_rows <= 0: + break + else: + batch_rows = min(remaining_rows, 1024) + df = result.read_data_frame(batch_rows) + total_rows += len(df) + else: + df = result.read_data_frame() + df_list.append(df) + df = pd.concat(df_list, ignore_index=True) + return df
