This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch rc/2.2.0
in repository https://gitbox.apache.org/repos/asf/tsfile.git

commit 3cf7514ae80ee5dc644728778d1c1f94855b27d4
Author: Colin Lee <[email protected]>
AuthorDate: Mon Nov 3 13:49:32 2025 +0800

    support python to_dataframe. (#624)
---
 python/tests/test_write_and_read.py | 43 ++++++++++++++++++++++--
 python/tsfile/__init__.py           |  3 +-
 python/tsfile/exceptions.py         | 31 ++++++++++++++++-
 python/tsfile/schema.py             |  3 ++
 python/tsfile/utils.py              | 67 +++++++++++++++++++++++++++++++++++++
 5 files changed, 142 insertions(+), 5 deletions(-)

diff --git a/python/tests/test_write_and_read.py 
b/python/tests/test_write_and_read.py
index 78731899..e5c87ab9 100644
--- a/python/tests/test_write_and_read.py
+++ b/python/tests/test_write_and_read.py
@@ -18,15 +18,17 @@
 
 import os
 
+import numpy as np
 import pytest
-
-from tsfile import ColumnSchema, TableSchema, TSEncoding, NotSupportedError
+from tsfile import ColumnSchema, TableSchema, TSEncoding
+from tsfile import Compressor
 from tsfile import TSDataType
 from tsfile import Tablet, RowRecord, Field
 from tsfile import TimeseriesSchema
 from tsfile import TsFileTableWriter
 from tsfile import TsFileWriter, TsFileReader, ColumnCategory
-from tsfile import Compressor
+from tsfile import to_dataframe
+from tsfile.exceptions import TableNotExistError, ColumnNotExistError, 
NotSupportedError
 
 
 def test_row_record_write_and_read():
@@ -60,6 +62,7 @@ def test_row_record_write_and_read():
         if os.path.exists("record_write_and_read.tsfile"):
             os.remove("record_write_and_read.tsfile")
 
+
 @pytest.mark.skip(reason="API not match")
 def test_tablet_write_and_read():
     try:
@@ -268,3 +271,37 @@ def test_tsfile_config():
         set_tsfile_config({"float_encoding_type_": TSEncoding.BITMAP})
     with pytest.raises(NotSupportedError):
         set_tsfile_config({"time_compress_type_": Compressor.PAA})
+
+
+def test_tsfile_to_df():
+    table = TableSchema("test_table",
+                        [ColumnSchema("device", TSDataType.STRING, 
ColumnCategory.TAG),
+                         ColumnSchema("value", TSDataType.DOUBLE, 
ColumnCategory.FIELD),
+                         ColumnSchema("value2", TSDataType.INT64, 
ColumnCategory.FIELD)])
+    try:
+        with TsFileTableWriter("table_write_to_df.tsfile", table) as writer:
+            tablet = Tablet(["device", "value", "value2"],
+                            [TSDataType.STRING, TSDataType.DOUBLE, 
TSDataType.INT64], 4097)
+            for i in range(4097):
+                tablet.add_timestamp(i, i)
+                tablet.add_value_by_name("device", i, "device" + str(i))
+                tablet.add_value_by_index(1, i, i * 100.0)
+                tablet.add_value_by_index(2, i, i * 100)
+            writer.write_table(tablet)
+        df1 = to_dataframe("table_write_to_df.tsfile")
+        assert df1.shape == (4097, 4)
+        assert df1["value2"].sum() == 100 * (1 + 4096) / 2 * 4096
+        assert df1["time"].dtype == np.int64
+        assert df1["value"].dtype == np.float64
+        assert df1["value2"].dtype == np.int64
+        df2 = to_dataframe("table_write_to_df.tsfile", column_names=["device", 
"value2"])
+        assert df2.shape == (4097, 3)
+        assert df1["value2"].equals(df2["value2"])
+        df3 = to_dataframe("table_write_to_df.tsfile", column_names=["device", 
"value"], max_row_num=8000)
+        assert df3.shape == (4097, 3)
+        with pytest.raises(TableNotExistError):
+            to_dataframe("table_write_to_df.tsfile", "test_tb")
+        with pytest.raises(ColumnNotExistError):
+            to_dataframe("table_write_to_df.tsfile", "test_table", ["device1"])
+    finally:
+        os.remove("table_write_to_df.tsfile")
diff --git a/python/tsfile/__init__.py b/python/tsfile/__init__.py
index 0c5081fa..bf755fce 100644
--- a/python/tsfile/__init__.py
+++ b/python/tsfile/__init__.py
@@ -33,4 +33,5 @@ from .exceptions import *
 from .tsfile_reader import TsFileReaderPy as TsFileReader, ResultSetPy as 
ResultSet
 from .tsfile_writer import TsFileWriterPy as TsFileWriter
 from .tsfile_py_cpp import get_tsfile_config, set_tsfile_config
-from .tsfile_table_writer import TsFileTableWriter
\ No newline at end of file
+from .tsfile_table_writer import TsFileTableWriter
+from .utils import to_dataframe
\ No newline at end of file
diff --git a/python/tsfile/exceptions.py b/python/tsfile/exceptions.py
index 15575954..2a3df283 100644
--- a/python/tsfile/exceptions.py
+++ b/python/tsfile/exceptions.py
@@ -24,81 +24,101 @@ class LibraryError(Exception):
         self.code = code if code is not None else self._default_code
         self.message = context if context is not None else 
self._default_message
         super().__init__(f"[{code}] {self.message}")
+
     def __str__(self):
         return f"{self.code}: {self.message}"
 
+
 class OOMError(LibraryError):
     _default_message = "Out of memory"
     _default_code = 1
 
+
 class NotExistsError(LibraryError):
     _default_message = "Requested resource does not exist"
     _default_code = 2
 
+
 class AlreadyExistsError(LibraryError):
     _default_message = "Resource already exists"
     _default_code = 3
 
+
 class InvalidArgumentError(LibraryError):
     _default_message = "Invalid argument provided"
     _default_code = 4
 
+
 class OutOfRangeError(LibraryError):
     _default_message = "Value out of valid range"
     _default_code = 5
 
+
 class PartialReadError(LibraryError):
     _default_message = "Incomplete data read operation"
     _default_code = 6
 
+
 class FileOpenError(LibraryError):
     _default_message = "Failed to open file"
     _default_code = 28
 
+
 class FileCloseError(LibraryError):
     _default_message = "Failed to close file"
     _default_code = 29
 
+
 class FileWriteError(LibraryError):
     _default_message = "Failed to write to file"
     _default_code = 30
 
+
 class FileReadError(LibraryError):
     _default_message = "Failed to read from file"
     _default_code = 31
 
+
 class FileSyncError(LibraryError):
     _default_message = "Failed to sync file contents"
     _default_code = 32
 
+
 class MetadataError(LibraryError):
     _default_message = "Metadata inconsistency detected"
     _default_code = 33
 
+
 class BufferNotEnoughError(LibraryError):
     _default_message = "Insufficient buffer space"
     _default_code = 36
 
+
 class NotSupportedError(LibraryError):
     _default_message = "Not support yet"
     _default_code = 40
 
+
 class DeviceNotExistError(LibraryError):
     _default_message = "Requested device does not exist"
     _default_code = 44
 
+
 class MeasurementNotExistError(LibraryError):
     _default_message = "Specified measurement does not exist"
     _default_code = 45
 
+
 class InvalidQueryError(LibraryError):
     _default_message = "Malformed query syntax"
     _default_code = 46
 
+
 class CompressionError(LibraryError):
     _default_message = "Data compression/decompression failed"
     _default_code = 48
 
+
 class TableNotExistError(LibraryError):
     _default_message = "Requested table does not exist"
     _default_code = 49
@@ -108,10 +128,17 @@ class TypeNotSupportedError(LibraryError):
     _default_message = "Unsupported data type"
     _default_code = 26
 
+
 class TypeMismatchError(LibraryError):
     _default_message = "Data type mismatch"
     _default_code = 27
 
+
+class ColumnNotExistError(LibraryError):
+    _default_message = "Column does not exist"
+    _default_code = 50
+
+
 ERROR_MAPPING = {
     1: OOMError,
     2: NotExistsError,
@@ -134,9 +161,11 @@ ERROR_MAPPING = {
     46: InvalidQueryError,
     48: CompressionError,
     49: TableNotExistError,
+    50: ColumnNotExistError,
 }
 
-def get_exception(code : int, context : str = None):
+
+def get_exception(code: int, context: str = None):
     if code == 0:
         return None
 
diff --git a/python/tsfile/schema.py b/python/tsfile/schema.py
index c62b0d01..ae7960c5 100644
--- a/python/tsfile/schema.py
+++ b/python/tsfile/schema.py
@@ -114,6 +114,9 @@ class TableSchema:
     def get_columns(self):
         return self.columns
 
+    def get_column_names(self):
+        return [name.get_column_name() for name in self.columns]
+
     def __repr__(self) -> str:
         return f"TableSchema({self.table_name}, {self.columns})"
 
diff --git a/python/tsfile/utils.py b/python/tsfile/utils.py
new file mode 100644
index 00000000..3d236606
--- /dev/null
+++ b/python/tsfile/utils.py
@@ -0,0 +1,67 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import pandas as pd
+
+from tsfile.exceptions import TableNotExistError, ColumnNotExistError
+from tsfile.tsfile_reader import TsFileReaderPy
+
+
+def to_dataframe(file_path: str,
+                 table_name: str = None,
+                 column_names: list[str] = None,
+                 max_row_num: int = None) -> pd.DataFrame:
+    with TsFileReaderPy(file_path) as reader:
+        total_rows = 0
+        table_schema = reader.get_all_table_schemas()
+        if len(table_schema) == 0:
+            raise TableNotExistError("Not found any table in the TsFile.")
+        if table_name is None:
+            # get the first table name by default
+            table_name, columns = next(iter(table_schema.items()))
+        else:
+            if table_name not in table_schema:
+                raise TableNotExistError(table_name)
+            columns = table_schema[table_name]
+
+        column_names_in_file = columns.get_column_names()
+
+        if column_names is not None:
+            for column in column_names:
+                if column not in column_names_in_file:
+                    raise ColumnNotExistError(column)
+        else:
+            column_names = column_names_in_file
+
+        df_list: list[pd.DataFrame] = []
+
+        with reader.query_table(table_name, column_names) as result:
+            while result.next():
+                if max_row_num is not None:
+                    remaining_rows = max_row_num - total_rows
+                    if remaining_rows <= 0:
+                        break
+                    else:
+                        batch_rows = min(remaining_rows, 1024)
+                    df = result.read_data_frame(batch_rows)
+                    total_rows += len(df)
+                else:
+                    df = result.read_data_frame()
+                df_list.append(df)
+        df = pd.concat(df_list, ignore_index=True)
+        return df

Reply via email to