This is an automated email from the ASF dual-hosted git repository.
colinlee pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/tsfile.git
The following commit(s) were added to refs/heads/develop by this push:
new de31a9a0 support python to_dataframe. (#624)
de31a9a0 is described below
commit de31a9a045b6cc3087870a3a922ff781c5fe0118
Author: Colin Lee <[email protected]>
AuthorDate: Mon Nov 3 13:49:32 2025 +0800
support python to_dataframe. (#624)
---
python/tests/test_write_and_read.py | 43 ++++++++++++++++++++++--
python/tsfile/__init__.py | 3 +-
python/tsfile/exceptions.py | 31 ++++++++++++++++-
python/tsfile/schema.py | 3 ++
python/tsfile/utils.py | 67 +++++++++++++++++++++++++++++++++++++
5 files changed, 142 insertions(+), 5 deletions(-)
diff --git a/python/tests/test_write_and_read.py
b/python/tests/test_write_and_read.py
index 78731899..e5c87ab9 100644
--- a/python/tests/test_write_and_read.py
+++ b/python/tests/test_write_and_read.py
@@ -18,15 +18,17 @@
import os
+import numpy as np
import pytest
-
-from tsfile import ColumnSchema, TableSchema, TSEncoding, NotSupportedError
+from tsfile import ColumnSchema, TableSchema, TSEncoding
+from tsfile import Compressor
from tsfile import TSDataType
from tsfile import Tablet, RowRecord, Field
from tsfile import TimeseriesSchema
from tsfile import TsFileTableWriter
from tsfile import TsFileWriter, TsFileReader, ColumnCategory
-from tsfile import Compressor
+from tsfile import to_dataframe
+from tsfile.exceptions import TableNotExistError, ColumnNotExistError,
NotSupportedError
def test_row_record_write_and_read():
@@ -60,6 +62,7 @@ def test_row_record_write_and_read():
if os.path.exists("record_write_and_read.tsfile"):
os.remove("record_write_and_read.tsfile")
+
@pytest.mark.skip(reason="API not match")
def test_tablet_write_and_read():
try:
@@ -268,3 +271,37 @@ def test_tsfile_config():
set_tsfile_config({"float_encoding_type_": TSEncoding.BITMAP})
with pytest.raises(NotSupportedError):
set_tsfile_config({"time_compress_type_": Compressor.PAA})
+
+
+def test_tsfile_to_df():
+ table = TableSchema("test_table",
+ [ColumnSchema("device", TSDataType.STRING,
ColumnCategory.TAG),
+ ColumnSchema("value", TSDataType.DOUBLE,
ColumnCategory.FIELD),
+ ColumnSchema("value2", TSDataType.INT64,
ColumnCategory.FIELD)])
+ try:
+ with TsFileTableWriter("table_write_to_df.tsfile", table) as writer:
+ tablet = Tablet(["device", "value", "value2"],
+ [TSDataType.STRING, TSDataType.DOUBLE,
TSDataType.INT64], 4097)
+ for i in range(4097):
+ tablet.add_timestamp(i, i)
+ tablet.add_value_by_name("device", i, "device" + str(i))
+ tablet.add_value_by_index(1, i, i * 100.0)
+ tablet.add_value_by_index(2, i, i * 100)
+ writer.write_table(tablet)
+ df1 = to_dataframe("table_write_to_df.tsfile")
+ assert df1.shape == (4097, 4)
+ assert df1["value2"].sum() == 100 * (1 + 4096) / 2 * 4096
+ assert df1["time"].dtype == np.int64
+ assert df1["value"].dtype == np.float64
+ assert df1["value2"].dtype == np.int64
+ df2 = to_dataframe("table_write_to_df.tsfile", column_names=["device",
"value2"])
+ assert df2.shape == (4097, 3)
+ assert df1["value2"].equals(df2["value2"])
+ df3 = to_dataframe("table_write_to_df.tsfile", column_names=["device",
"value"], max_row_num=8000)
+ assert df3.shape == (4097, 3)
+ with pytest.raises(TableNotExistError):
+ to_dataframe("table_write_to_df.tsfile", "test_tb")
+ with pytest.raises(ColumnNotExistError):
+ to_dataframe("table_write_to_df.tsfile", "test_table", ["device1"])
+ finally:
+ os.remove("table_write_to_df.tsfile")
diff --git a/python/tsfile/__init__.py b/python/tsfile/__init__.py
index 0c5081fa..bf755fce 100644
--- a/python/tsfile/__init__.py
+++ b/python/tsfile/__init__.py
@@ -33,4 +33,5 @@ from .exceptions import *
from .tsfile_reader import TsFileReaderPy as TsFileReader, ResultSetPy as
ResultSet
from .tsfile_writer import TsFileWriterPy as TsFileWriter
from .tsfile_py_cpp import get_tsfile_config, set_tsfile_config
-from .tsfile_table_writer import TsFileTableWriter
\ No newline at end of file
+from .tsfile_table_writer import TsFileTableWriter
+from .utils import to_dataframe
\ No newline at end of file
diff --git a/python/tsfile/exceptions.py b/python/tsfile/exceptions.py
index 15575954..2a3df283 100644
--- a/python/tsfile/exceptions.py
+++ b/python/tsfile/exceptions.py
@@ -24,81 +24,101 @@ class LibraryError(Exception):
self.code = code if code is not None else self._default_code
self.message = context if context is not None else
self._default_message
super().__init__(f"[{code}] {self.message}")
+
def __str__(self):
return f"{self.code}: {self.message}"
+
class OOMError(LibraryError):
_default_message = "Out of memory"
_default_code = 1
+
class NotExistsError(LibraryError):
_default_message = "Requested resource does not exist"
_default_code = 2
+
class AlreadyExistsError(LibraryError):
_default_message = "Resource already exists"
_default_code = 3
+
class InvalidArgumentError(LibraryError):
_default_message = "Invalid argument provided"
_default_code = 4
+
class OutOfRangeError(LibraryError):
_default_message = "Value out of valid range"
_default_code = 5
+
class PartialReadError(LibraryError):
_default_message = "Incomplete data read operation"
_default_code = 6
+
class FileOpenError(LibraryError):
_default_message = "Failed to open file"
_default_code = 28
+
class FileCloseError(LibraryError):
_default_message = "Failed to close file"
_default_code = 29
+
class FileWriteError(LibraryError):
_default_message = "Failed to write to file"
_default_code = 30
+
class FileReadError(LibraryError):
_default_message = "Failed to read from file"
_default_code = 31
+
class FileSyncError(LibraryError):
_default_message = "Failed to sync file contents"
_default_code = 32
+
class MetadataError(LibraryError):
_default_message = "Metadata inconsistency detected"
_default_code = 33
+
class BufferNotEnoughError(LibraryError):
_default_message = "Insufficient buffer space"
_default_code = 36
+
class NotSupportedError(LibraryError):
_default_message = "Not support yet"
_default_code = 40
+
class DeviceNotExistError(LibraryError):
_default_message = "Requested device does not exist"
_default_code = 44
+
class MeasurementNotExistError(LibraryError):
_default_message = "Specified measurement does not exist"
_default_code = 45
+
class InvalidQueryError(LibraryError):
_default_message = "Malformed query syntax"
_default_code = 46
+
class CompressionError(LibraryError):
_default_message = "Data compression/decompression failed"
_default_code = 48
+
class TableNotExistError(LibraryError):
_default_message = "Requested table does not exist"
_default_code = 49
@@ -108,10 +128,17 @@ class TypeNotSupportedError(LibraryError):
_default_message = "Unsupported data type"
_default_code = 26
+
class TypeMismatchError(LibraryError):
_default_message = "Data type mismatch"
_default_code = 27
+
+class ColumnNotExistError(LibraryError):
+ _default_message = "Column does not exist"
+ _default_code = 50
+
+
ERROR_MAPPING = {
1: OOMError,
2: NotExistsError,
@@ -134,9 +161,11 @@ ERROR_MAPPING = {
46: InvalidQueryError,
48: CompressionError,
49: TableNotExistError,
+ 50: ColumnNotExistError,
}
-def get_exception(code : int, context : str = None):
+
+def get_exception(code: int, context: str = None):
if code == 0:
return None
diff --git a/python/tsfile/schema.py b/python/tsfile/schema.py
index c62b0d01..ae7960c5 100644
--- a/python/tsfile/schema.py
+++ b/python/tsfile/schema.py
@@ -114,6 +114,9 @@ class TableSchema:
def get_columns(self):
return self.columns
+ def get_column_names(self):
+ return [name.get_column_name() for name in self.columns]
+
def __repr__(self) -> str:
return f"TableSchema({self.table_name}, {self.columns})"
diff --git a/python/tsfile/utils.py b/python/tsfile/utils.py
new file mode 100644
index 00000000..3d236606
--- /dev/null
+++ b/python/tsfile/utils.py
@@ -0,0 +1,67 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import pandas as pd
+
+from tsfile.exceptions import TableNotExistError, ColumnNotExistError
+from tsfile.tsfile_reader import TsFileReaderPy
+
+
+def to_dataframe(file_path: str,
+ table_name: str = None,
+ column_names: list[str] = None,
+ max_row_num: int = None) -> pd.DataFrame:
+ with TsFileReaderPy(file_path) as reader:
+ total_rows = 0
+ table_schema = reader.get_all_table_schemas()
+ if len(table_schema) == 0:
+ raise TableNotExistError("Not found any table in the TsFile.")
+ if table_name is None:
+ # get the first table name by default
+ table_name, columns = next(iter(table_schema.items()))
+ else:
+ if table_name not in table_schema:
+ raise TableNotExistError(table_name)
+ columns = table_schema[table_name]
+
+ column_names_in_file = columns.get_column_names()
+
+ if column_names is not None:
+ for column in column_names:
+ if column not in column_names_in_file:
+ raise ColumnNotExistError(column)
+ else:
+ column_names = column_names_in_file
+
+ df_list: list[pd.DataFrame] = []
+
+ with reader.query_table(table_name, column_names) as result:
+ while result.next():
+ if max_row_num is not None:
+ remaining_rows = max_row_num - total_rows
+ if remaining_rows <= 0:
+ break
+ else:
+ batch_rows = min(remaining_rows, 1024)
+ df = result.read_data_frame(batch_rows)
+ total_rows += len(df)
+ else:
+ df = result.read_data_frame()
+ df_list.append(df)
+ df = pd.concat(df_list, ignore_index=True)
+ return df