This is an automated email from the ASF dual-hosted git repository. colinlee pushed a commit to branch support_dataframe_to_tsfile in repository https://gitbox.apache.org/repos/asf/tsfile.git
commit ca1ad79efc8169b454c6f79adddff5645f51d83b Author: ColinLee <[email protected]> AuthorDate: Wed Jan 14 20:59:56 2026 +0800 tmp code. --- python/tests/test_write_and_read.py | 225 ++++++++++++++++++++++++++++++++++- python/tsfile/constants.py | 81 ++++++++++++- python/tsfile/tsfile_cpp.pxd | 3 +- python/tsfile/tsfile_py_cpp.pxd | 1 + python/tsfile/tsfile_py_cpp.pyx | 182 ++++++++++++++++++++++++++-- python/tsfile/tsfile_table_writer.py | 72 ++++++++++- python/tsfile/tsfile_writer.pyx | 30 +++-- python/tsfile/utils.py | 116 ++++++++++++++++++ 8 files changed, 678 insertions(+), 32 deletions(-) diff --git a/python/tests/test_write_and_read.py b/python/tests/test_write_and_read.py index b327e2d3..ad8d698a 100644 --- a/python/tests/test_write_and_read.py +++ b/python/tests/test_write_and_read.py @@ -16,6 +16,7 @@ # under the License. # +import os from datetime import date import numpy as np @@ -31,7 +32,7 @@ from tsfile import TimeseriesSchema from tsfile import TsFileTableWriter from tsfile import TsFileWriter, TsFileReader, ColumnCategory from tsfile import to_dataframe -from tsfile.exceptions import TableNotExistError, ColumnNotExistError, NotSupportedError +from tsfile.exceptions import TableNotExistError, ColumnNotExistError, NotSupportedError, TypeMismatchError def test_row_record_write_and_read(): @@ -985,7 +986,227 @@ def test_table_all_datatype_query_to_dataframe_variants(): os.remove(tsfile_path) -import os +def convert_to_nullable_types(df): + """ + Convert DataFrame columns to nullable types to match returned DataFrame from to_dataframe. + This handles the fact that returned DataFrames use nullable types (Int64, Float64, etc.) + to support Null values. + """ + df = df.copy() + for col in df.columns: + dtype = df[col].dtype + if dtype == 'int64': + df[col] = df[col].astype('Int64') + elif dtype == 'int32': + df[col] = df[col].astype('Int32') + elif dtype == 'float64': + df[col] = df[col].astype('Float64') + elif dtype == 'float32': + df[col] = df[col].astype('Float32') + elif dtype == 'bool': + df[col] = df[col].astype('boolean') + return df + + +def test_write_dataframe_basic(): + table = TableSchema("test_table", + [ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG), + ColumnSchema("value", TSDataType.DOUBLE, ColumnCategory.FIELD), + ColumnSchema("value2", TSDataType.INT64, ColumnCategory.FIELD)]) + tsfile_path = "test_write_dataframe_basic.tsfile" + try: + if os.path.exists(tsfile_path): + os.remove(tsfile_path) + + with TsFileTableWriter(tsfile_path, table) as writer: + # Create DataFrame with 'time' column + df = pd.DataFrame({ + 'time': [i for i in range(100)], + 'device': [f"device{i}" for i in range(100)], + 'value': [i * 1.5 for i in range(100)], + 'value2': [i * 10 for i in range(100)] + }) + writer.write_dataframe(df) + + df_read = to_dataframe(tsfile_path, table_name="test_table") + # Sort both DataFrames by time for comparison + df_read = df_read.sort_values('time').reset_index(drop=True) + df_sorted = convert_to_nullable_types(df.sort_values('time').reset_index(drop=True)) + assert df_read.shape == (100, 4) + assert df_read["time"].equals(df_sorted["time"]) + assert df_read["device"].equals(df_sorted["device"]) + assert df_read["value"].equals(df_sorted["value"]) + assert df_read["value2"].equals(df_sorted["value2"]) + finally: + if os.path.exists(tsfile_path): + os.remove(tsfile_path) + + +def test_write_dataframe_with_index(): + """Test write_dataframe using DataFrame index as time when no 'time' column exists.""" + table = TableSchema("test_table", + [ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG), + ColumnSchema("value", TSDataType.DOUBLE, ColumnCategory.FIELD)]) + tsfile_path = "test_write_dataframe_index.tsfile" + try: + if os.path.exists(tsfile_path): + os.remove(tsfile_path) + + with TsFileTableWriter(tsfile_path, table) as writer: + # Create DataFrame without 'time' column, use index + df = pd.DataFrame({ + 'device': [f"device{i}" for i in range(50)], + 'value': [i * 2.5 for i in range(50)] + }) + df.index = [i * 10 for i in range(50)] # Set index as timestamps + writer.write_dataframe(df) + + # Verify by reading back + df_read = to_dataframe(tsfile_path, table_name="test_table") + # Sort both DataFrames by time for comparison + df_read = df_read.sort_values('time').reset_index(drop=True) + df_sorted = df.sort_index() + df_sorted = convert_to_nullable_types(df_sorted.reset_index(drop=True)) + # Create time series from original index + time_series = pd.Series(df.sort_index().index.values, dtype='Int64') + assert df_read.shape == (50, 3) + # Compare time column with sorted index + assert df_read["time"].equals(time_series) + assert df_read["device"].equals(df_sorted["device"]) + assert df_read["value"].equals(df_sorted["value"]) + finally: + if os.path.exists(tsfile_path): + os.remove(tsfile_path) + + +def test_write_dataframe_case_insensitive(): + """Test write_dataframe with case-insensitive column matching.""" + table = TableSchema("test_table", + [ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG), + ColumnSchema("value", TSDataType.DOUBLE, ColumnCategory.FIELD)]) + tsfile_path = "test_write_dataframe_case.tsfile" + try: + if os.path.exists(tsfile_path): + os.remove(tsfile_path) + + with TsFileTableWriter(tsfile_path, table) as writer: + # DataFrame with different case column names + df = pd.DataFrame({ + 'Time': [i for i in range(30)], # Capital T + 'Device': [f"device{i}" for i in range(30)], # Capital D + 'VALUE': [i * 3.0 for i in range(30)] # All caps + }) + writer.write_dataframe(df) + + # Verify by reading back + df_read = to_dataframe(tsfile_path, table_name="test_table") + # Sort both DataFrames by time for comparison + df_read = df_read.sort_values('time').reset_index(drop=True) + df_sorted = convert_to_nullable_types(df.sort_values('Time').reset_index(drop=True)) + assert df_read.shape == (30, 3) + assert df_read["time"].equals(df_sorted["Time"]) + assert df_read["device"].equals(df_sorted["Device"]) + assert df_read["value"].equals(df_sorted["VALUE"]) + finally: + if os.path.exists(tsfile_path): + os.remove(tsfile_path) + + +def test_write_dataframe_column_not_in_schema(): + """Test write_dataframe raises ColumnNotExistError when DataFrame has extra columns.""" + table = TableSchema("test_table", + [ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG), + ColumnSchema("value", TSDataType.DOUBLE, ColumnCategory.FIELD)]) + tsfile_path = "test_write_dataframe_extra_col.tsfile" + try: + if os.path.exists(tsfile_path): + os.remove(tsfile_path) + + with TsFileTableWriter(tsfile_path, table) as writer: + # DataFrame with extra column not in schema + df = pd.DataFrame({ + 'time': [i for i in range(10)], + 'device': [f"device{i}" for i in range(10)], + 'value': [i * 1.0 for i in range(10)], + 'extra_column': [i for i in range(10)] # Not in schema + }) + with pytest.raises(ColumnNotExistError) as exc_info: + writer.write_dataframe(df) + assert "extra_column" in str(exc_info.value) + finally: + if os.path.exists(tsfile_path): + os.remove(tsfile_path) + + +def test_write_dataframe_type_mismatch(): + """Test write_dataframe raises TypeMismatchError when types are incompatible.""" + table = TableSchema("test_table", + [ColumnSchema("value", TSDataType.STRING, ColumnCategory.FIELD)]) + tsfile_path = "test_write_dataframe_type_mismatch.tsfile" + try: + if os.path.exists(tsfile_path): + os.remove(tsfile_path) + + with TsFileTableWriter(tsfile_path, table) as writer: + # DataFrame with incompatible type (INT64 vs STRING) + df = pd.DataFrame({ + 'time': [i for i in range(10)], + 'value': [i for i in range(10)] # INT64, but schema expects STRING + }) + with pytest.raises(TypeMismatchError) as exc_info: + writer.write_dataframe(df) + assert "Type mismatches" in str(exc_info.value) + finally: + if os.path.exists(tsfile_path): + os.remove(tsfile_path) + + +def test_write_dataframe_all_datatypes(): + """Test write_dataframe with all supported data types.""" + table = TableSchema("test_table", + [ColumnSchema("bool_col", TSDataType.BOOLEAN, ColumnCategory.FIELD), + ColumnSchema("int32_col", TSDataType.INT32, ColumnCategory.FIELD), + ColumnSchema("int64_col", TSDataType.INT64, ColumnCategory.FIELD), + ColumnSchema("float_col", TSDataType.FLOAT, ColumnCategory.FIELD), + ColumnSchema("double_col", TSDataType.DOUBLE, ColumnCategory.FIELD), + ColumnSchema("string_col", TSDataType.STRING, ColumnCategory.FIELD), + ColumnSchema("blob_col", TSDataType.BLOB, ColumnCategory.FIELD)]) + tsfile_path = "test_write_dataframe_all_types.tsfile" + try: + if os.path.exists(tsfile_path): + os.remove(tsfile_path) + + with TsFileTableWriter(tsfile_path, table) as writer: + df = pd.DataFrame({ + 'time': [i for i in range(50)], + 'bool_col': [i % 2 == 0 for i in range(50)], + 'int32_col': pd.Series([i for i in range(50)], dtype='int32'), + 'int64_col': [i * 10 for i in range(50)], + 'float_col': pd.Series([i * 1.5 for i in range(50)], dtype='float32'), + 'double_col': [i * 2.5 for i in range(50)], + 'string_col': [f"str{i}" for i in range(50)], + 'blob_col': [f"blob{i}".encode('utf-8') for i in range(50)] + }) + writer.write_dataframe(df) + + # Verify by reading back + df_read = to_dataframe(tsfile_path, table_name="test_table") + # Sort both DataFrames by time for comparison + df_read = df_read.sort_values('time').reset_index(drop=True) + df_sorted = convert_to_nullable_types(df.sort_values('time').reset_index(drop=True)) + assert df_read.shape == (50, 8) + assert df_read["bool_col"].equals(df_sorted["bool_col"]) + assert df_read["int32_col"].equals(df_sorted["int32_col"]) + assert df_read["int64_col"].equals(df_sorted["int64_col"]) + assert np.allclose(df_read["float_col"], df_sorted["float_col"]) + assert np.allclose(df_read["double_col"], df_sorted["double_col"]) + assert df_read["string_col"].equals(df_sorted["string_col"]) + # BLOB comparison + for i in range(50): + assert df_read["blob_col"].iloc[i] == df_sorted["blob_col"].iloc[i] + finally: + if os.path.exists(tsfile_path): + os.remove(tsfile_path) if __name__ == "__main__": os.chdir(os.path.dirname(os.path.abspath(__file__))) diff --git a/python/tsfile/constants.py b/python/tsfile/constants.py index 7d1f5ff5..e6d9dd6b 100644 --- a/python/tsfile/constants.py +++ b/python/tsfile/constants.py @@ -16,6 +16,7 @@ # under the License. # from enum import unique, IntEnum +import numpy as np @unique @@ -62,13 +63,13 @@ class TSDataType(IntEnum): elif self == TSDataType.INT64: return "Int64" elif self == TSDataType.FLOAT: - return "float32" + return "Float32" elif self == TSDataType.DOUBLE: - return "float64" + return "Float64" elif self == TSDataType.TEXT or self == TSDataType.STRING: return "object" elif self == TSDataType.TIMESTAMP: - return "int64" + return "Int64" elif self == TSDataType.DATE: return "object" elif self == TSDataType.BLOB: @@ -76,6 +77,80 @@ class TSDataType(IntEnum): else: raise ValueError(f"Unknown data type: {self}") + @classmethod + def from_pandas_datatype(cls, dtype): + if dtype is np.bool_: + return cls.BOOLEAN + elif dtype is np.int32: + return cls.INT32 + elif dtype is np.int64: + return cls.INT64 + elif dtype is np.float32: + return cls.FLOAT + elif dtype is np.float64: + return cls.DOUBLE + elif dtype is np.object_: + return cls.STRING + + try: + import pandas as pd + if hasattr(pd, 'StringDtype') and isinstance(dtype, pd.StringDtype): + return cls.STRING + except (ImportError, AttributeError): + pass + + if hasattr(dtype, 'type'): + dtype = dtype.type + if dtype is np.bool_: + return cls.BOOLEAN + elif dtype is np.int32: + return cls.INT32 + elif dtype is np.int64: + return cls.INT64 + elif dtype is np.float32: + return cls.FLOAT + elif dtype is np.float64: + return cls.DOUBLE + elif dtype is np.object_: + # object 类型默认返回 STRING,实际类型需要在 write_dataframe 中检查数据内容 + return cls.STRING + + dtype_str = str(dtype) + + if 'stringdtype' in dtype_str.lower() or dtype_str.startswith('string'): + return cls.STRING + + dtype_map = { + 'bool': cls.BOOLEAN, + 'boolean': cls.BOOLEAN, + 'int32': cls.INT32, + 'Int32': cls.INT32, + 'int64': cls.INT64, + 'Int64': cls.INT64, + 'float32': cls.FLOAT, + 'float64': cls.DOUBLE, + 'bytes': cls.BLOB, + 'object': cls.STRING, + 'string': cls.STRING, + } + + if dtype_str in dtype_map: + return dtype_map[dtype_str] + + dtype_lower = dtype_str.lower() + if dtype_lower in dtype_map: + return dtype_map[dtype_lower] + + if 'object_' in dtype_lower or dtype_str == "<class 'numpy.object_'>": + return cls.STRING + + if dtype_str.startswith('datetime64'): + return cls.TIMESTAMP + + return cls.STRING + + + @unique class TSEncoding(IntEnum): diff --git a/python/tsfile/tsfile_cpp.pxd b/python/tsfile/tsfile_cpp.pxd index 40bff4eb..ab915fef 100644 --- a/python/tsfile/tsfile_cpp.pxd +++ b/python/tsfile/tsfile_cpp.pxd @@ -137,7 +137,8 @@ cdef extern from "./tsfile_cwrapper.h": TSDataType * data_types, int column_num, int max_rows); - Tablet tablet_new(const char** column_names, TSDataType * data_types, int column_num); + Tablet tablet_new(char** column_name_list, TSDataType* data_types, + uint32_t column_num, uint32_t max_rows); ErrorCode tablet_add_timestamp(Tablet tablet, uint32_t row_index, int64_t timestamp); ErrorCode tablet_add_value_by_index_int64_t(Tablet tablet, uint32_t row_index, uint32_t column_index, diff --git a/python/tsfile/tsfile_py_cpp.pxd b/python/tsfile/tsfile_py_cpp.pxd index e44bb588..9ce2f90d 100644 --- a/python/tsfile/tsfile_py_cpp.pxd +++ b/python/tsfile/tsfile_py_cpp.pxd @@ -33,6 +33,7 @@ cdef public api DeviceSchema* to_c_device_schema(object py_schema) cdef public api ColumnSchema* to_c_column_schema(object py_schema) cdef public api TableSchema* to_c_table_schema(object py_schema) cdef public api Tablet to_c_tablet(object tablet) +cdef public api Tablet dataframe_to_c_tablet(object target_name, object dataframe) cdef public api TsRecord to_c_record(object row_record) cdef public api void free_c_table_schema(TableSchema* c_schema) cdef public api void free_c_column_schema(ColumnSchema* c_schema) diff --git a/python/tsfile/tsfile_py_cpp.pyx b/python/tsfile/tsfile_py_cpp.pyx index d9924d7a..84c550f9 100644 --- a/python/tsfile/tsfile_py_cpp.pyx +++ b/python/tsfile/tsfile_py_cpp.pyx @@ -19,6 +19,9 @@ from .date_utils import parse_date_to_int from .tsfile_cpp cimport * +import pandas as pd +import numpy as np + from libc.stdlib cimport free from libc.stdlib cimport malloc from libc.string cimport strdup @@ -220,7 +223,7 @@ cdef Tablet to_c_tablet(object tablet): cdef TSDataType * column_types cdef bytes row_bytes cdef char *raw_str - cdef const char* str_ptr + cdef const char * str_ptr cdef Py_ssize_t raw_len if tablet.get_target_name() is not None: @@ -293,7 +296,7 @@ cdef Tablet to_c_tablet(object tablet): for row in range(max_row_num): if value[row] is not None: py_value = value[row] - str_ptr = PyUnicode_AsUTF8AndSize(py_value, &raw_len) + str_ptr = PyUnicode_AsUTF8AndSize(py_value, &raw_len) tablet_add_value_by_index_string_with_len(ctablet, row, col, str_ptr, raw_len) elif data_type == TS_DATATYPE_BLOB: @@ -304,13 +307,166 @@ cdef Tablet to_c_tablet(object tablet): return ctablet +cdef TSDataType pandas_dtype_to_ts_data_type(object dtype): + return to_c_data_type(TSDataTypePy.from_pandas_datatype(dtype)) + +cdef Tablet dataframe_to_c_tablet(object target_name, object dataframe): + cdef Tablet ctablet + cdef int max_row_num + cdef TSDataType data_type + cdef int64_t timestamp + cdef const char * device_id_c = NULL + cdef char** columns_names + cdef TSDataType * columns_types + cdef char *raw_str + cdef const char * str_ptr + cdef Py_ssize_t raw_len + cdef int column_num + cdef int i, row + cdef object value + cdef object py_value + cdef object value_bytes + + device_id_bytes = PyUnicode_AsUTF8String(target_name.lower()) + device_id_c = device_id_bytes + df_columns = list(dataframe.columns) + use_id_as_time = False + time_column_name = None + + # Find column with lowercase name 'time' + + + for col in df_columns: + if col.lower() == 'time': + time_column_name = col + break + + if time_column_name is None: + use_id_as_time = True + + data_columns = [col for col in df_columns if col.lower() != 'time'] + column_num = len(data_columns) + + if column_num == 0: + raise ValueError("DataFrame must have at least one data column besides 'time'") + + max_row_num = len(dataframe) + + column_types_list = [] + for col_name in data_columns: + pandas_dtype = dataframe[col_name].dtype + ds_type = pandas_dtype_to_ts_data_type(pandas_dtype) + if ds_type == TS_DATATYPE_STRING: + column_series = dataframe[col_name] + first_valid_idx = column_series.first_valid_index() + if first_valid_idx is not None: + first_value = column_series[first_valid_idx] + if isinstance(first_value, bytes): + ds_type = TS_DATATYPE_BLOB + column_types_list.append(ds_type) + + columns_names = <char**> malloc(sizeof(char *) * column_num) + columns_types = <TSDataType *> malloc(sizeof(TSDataType) * column_num) + + for i in range(column_num): + columns_names[i] = strdup(data_columns[i].lower().encode('utf-8')) + columns_types[i] = column_types_list[i] + + ctablet = _tablet_new_with_target_name(device_id_c, columns_names, columns_types, column_num, + max_row_num) + + free(columns_types) + for i in range(column_num): + free(columns_names[i]) + free(columns_names) + + if use_id_as_time: + for row in range(max_row_num): + timestamp_py = dataframe.index[row] + if pd.isna(timestamp_py): + continue + timestamp = <int64_t> timestamp_py + tablet_add_timestamp(ctablet, row, timestamp) + else: + time_values = dataframe[time_column_name].values + for row in range(max_row_num): + timestamp_py = time_values[row] + if pd.isna(timestamp_py): + continue + timestamp = <int64_t> timestamp_py + tablet_add_timestamp(ctablet, row, timestamp) + + for col in range(column_num): + col_name = data_columns[col] + data_type = column_types_list[col] + column_values = dataframe[col_name].values + + # BOOLEAN + if data_type == TS_DATATYPE_BOOLEAN: + for row in range(max_row_num): + value = column_values[row] + if not pd.isna(value): + tablet_add_value_by_index_bool(ctablet, row, col, <bint> value) + # INT32 + elif data_type == TS_DATATYPE_INT32: + for row in range(max_row_num): + value = column_values[row] + if not pd.isna(value): + tablet_add_value_by_index_int32_t(ctablet, row, col, <int32_t> value) + # INT64 + elif data_type == TS_DATATYPE_INT64 or data_type == TS_DATATYPE_TIMESTAMP: + for row in range(max_row_num): + value = column_values[row] + if not pd.isna(value): + tablet_add_value_by_index_int64_t(ctablet, row, col, <int64_t> value) + # FLOAT + elif data_type == TS_DATATYPE_FLOAT: + for row in range(max_row_num): + value = column_values[row] + if not pd.isna(value): + tablet_add_value_by_index_float(ctablet, row, col, <float> value) + # DOUBLE + elif data_type == TS_DATATYPE_DOUBLE: + for row in range(max_row_num): + value = column_values[row] + if not pd.isna(value): + tablet_add_value_by_index_double(ctablet, row, col, <double> value) + # DATE + elif data_type == TS_DATATYPE_DATE: + for row in range(max_row_num): + value = column_values[row] + if not pd.isna(value): + tablet_add_value_by_index_int32_t(ctablet, row, col, parse_date_to_int(value)) + # STRING or TEXT + elif data_type == TS_DATATYPE_STRING or data_type == TS_DATATYPE_TEXT: + for row in range(max_row_num): + value = column_values[row] + if not pd.isna(value): + py_value = str(value) + str_ptr = PyUnicode_AsUTF8AndSize(py_value, &raw_len) + tablet_add_value_by_index_string_with_len(ctablet, row, col, str_ptr, raw_len) + # BLOB + elif data_type == TS_DATATYPE_BLOB: + for row in range(max_row_num): + value = column_values[row] + if not pd.isna(value): + if isinstance(value, bytes): + PyBytes_AsStringAndSize(value, &raw_str, &raw_len) + tablet_add_value_by_index_string_with_len(ctablet, row, col, raw_str, raw_len) + else: + value_bytes = bytes(value) + PyBytes_AsStringAndSize(value_bytes, &raw_str, &raw_len) + tablet_add_value_by_index_string_with_len(ctablet, row, col, raw_str, raw_len) + + return ctablet + cdef TsRecord to_c_record(object row_record): cdef int field_num = row_record.get_fields_num() cdef int64_t timestamp = <int64_t> row_record.get_timestamp() cdef bytes device_id_bytes = PyUnicode_AsUTF8String(row_record.get_device_id()) - cdef const char* device_id = device_id_bytes - cdef const char* str_ptr - cdef char* blob_ptr + cdef const char * device_id = device_id_bytes + cdef const char * str_ptr + cdef char * blob_ptr cdef Py_ssize_t str_len cdef TsRecord record cdef int i @@ -320,9 +476,11 @@ cdef TsRecord to_c_record(object row_record): field = row_record.get_fields()[i] data_type = to_c_data_type(field.get_data_type()) if data_type == TS_DATATYPE_BOOLEAN: - _insert_data_into_ts_record_by_name_bool(record, PyUnicode_AsUTF8(field.get_field_name()), field.get_bool_value()) + _insert_data_into_ts_record_by_name_bool(record, PyUnicode_AsUTF8(field.get_field_name()), + field.get_bool_value()) elif data_type == TS_DATATYPE_INT32 or data_type == TS_DATATYPE_DATE: - _insert_data_into_ts_record_by_name_int32_t(record, PyUnicode_AsUTF8(field.get_field_name()), field.get_int_value()) + _insert_data_into_ts_record_by_name_int32_t(record, PyUnicode_AsUTF8(field.get_field_name()), + field.get_int_value()) elif data_type == TS_DATATYPE_INT64: _insert_data_into_ts_record_by_name_int64_t(record, PyUnicode_AsUTF8(field.get_field_name()), field.get_long_value()) @@ -333,15 +491,17 @@ cdef TsRecord to_c_record(object row_record): _insert_data_into_ts_record_by_name_double(record, PyUnicode_AsUTF8(field.get_field_name()), field.get_double_value()) elif data_type == TS_DATATYPE_FLOAT: - _insert_data_into_ts_record_by_name_float(record, PyUnicode_AsUTF8(field.get_field_name()), field.get_float_value()) + _insert_data_into_ts_record_by_name_float(record, PyUnicode_AsUTF8(field.get_field_name()), + field.get_float_value()) elif data_type == TS_DATATYPE_TEXT or data_type == TS_DATATYPE_STRING: - str_ptr = PyUnicode_AsUTF8AndSize(field.get_string_value(), &str_len) - _insert_data_into_ts_record_by_name_string_with_len(record, PyUnicode_AsUTF8(field.get_field_name()), str_ptr, str_len) + str_ptr = PyUnicode_AsUTF8AndSize(field.get_string_value(), &str_len) + _insert_data_into_ts_record_by_name_string_with_len(record, PyUnicode_AsUTF8(field.get_field_name()), + str_ptr, str_len) elif data_type == TS_DATATYPE_BLOB: if PyBytes_AsStringAndSize(field.get_string_value(), &blob_ptr, &str_len) < 0: raise ValueError("blob not legal") _insert_data_into_ts_record_by_name_string_with_len(record, PyUnicode_AsUTF8(field.get_field_name()), - <const char*> blob_ptr, <uint32_t>str_len) + <const char *> blob_ptr, <uint32_t> str_len) return record # Free c structs' space diff --git a/python/tsfile/tsfile_table_writer.py b/python/tsfile/tsfile_table_writer.py index 28193360..54f759c9 100644 --- a/python/tsfile/tsfile_table_writer.py +++ b/python/tsfile/tsfile_table_writer.py @@ -15,9 +15,12 @@ # specific language governing permissions and limitations # under the License. # +import pandas as pd from tsfile import TableSchema, Tablet, TableNotExistError from tsfile import TsFileWriter +from tsfile.constants import TSDataType +from tsfile.exceptions import ColumnNotExistError, TypeMismatchError class TsFileTableWriter: @@ -31,7 +34,7 @@ class TsFileTableWriter: according to that schema, and serialize this data into a TsFile. """ - def __init__(self, path: str, table_schema: TableSchema, memory_threshold = 128 * 1024 * 1024): + def __init__(self, path: str, table_schema: TableSchema, memory_threshold=128 * 1024 * 1024): """ :param path: The path of tsfile, will create if it doesn't exist. :param table_schema: describes the schema of the tables they want to write. @@ -39,7 +42,7 @@ class TsFileTableWriter: """ self.writer = TsFileWriter(path, memory_threshold) self.writer.register_table(table_schema) - self.exclusive_table_name_ = table_schema.get_table_name() + self.tableSchema = table_schema def write_table(self, tablet: Tablet): """ @@ -49,11 +52,72 @@ class TsFileTableWriter: :raise: TableNotExistError if table does not exist or tablet's table_name does not match tableschema. """ if tablet.get_target_name() is None: - tablet.set_table_name(self.exclusive_table_name_) - elif self.exclusive_table_name_ is not None and tablet.get_target_name() != self.exclusive_table_name_: + tablet.set_table_name(self.tableSchema.get_table_name()) + elif (self.tableSchema.get_table_name() is not None + and tablet.get_target_name() != self.tableSchema.get_table_name()): raise TableNotExistError self.writer.write_table(tablet) + def write_dataframe(self, dataframe: pd.DataFrame): + """ + Write a pandas DataFrame into table in tsfile. + :param dataframe: pandas DataFrame with 'time' column and data columns matching schema. + :return: no return value. + :raise: ColumnNotExistError if DataFrame columns don't match schema. + :raise: TypeMismatchError if DataFrame column types are incompatible with schema. + """ + if dataframe is None or dataframe.empty: + pass + + # Create mapping from lowercase column name to original column name + df_column_name_map = {col.lower(): col for col in dataframe.columns if col.lower() != 'time'} + df_columns = list(df_column_name_map.keys()) + + schema_column_names = set(self.tableSchema.get_column_names()) + df_columns_set = set(df_columns) + + extra_columns = df_columns_set - schema_column_names + if extra_columns: + raise ColumnNotExistError( + code=50, + context=f"DataFrame has columns not in schema: {', '.join(sorted(extra_columns))}" + ) + + schema_column_map = { + col.get_column_name(): col for col in self.tableSchema.get_columns() + } + + type_mismatches = [] + for col_name in df_columns: + df_col_name_original = df_column_name_map[col_name] + + df_dtype = dataframe[df_col_name_original].dtype + df_ts_type = TSDataType.from_pandas_datatype(df_dtype) + + if df_ts_type == TSDataType.STRING and (df_dtype == 'object' or str(df_dtype) == "<class 'numpy.object_'>"): + column_series = dataframe[df_col_name_original] + first_valid_idx = column_series.first_valid_index() + if first_valid_idx is not None: + first_value = column_series[first_valid_idx] + if isinstance(first_value, bytes): + df_ts_type = TSDataType.BLOB + + schema_col = schema_column_map[col_name] + expected_ts_type = schema_col.get_data_type() + + if df_ts_type != expected_ts_type: + type_mismatches.append( + f"Column '{col_name}': expected {expected_ts_type.name}, got {df_ts_type.name}" + ) + + if type_mismatches: + raise TypeMismatchError( + code=27, + context=f"Type mismatches: {'; '.join(type_mismatches)}" + ) + + self.writer.write_dataframe(self.tableSchema.get_table_name(), dataframe) + def close(self): """ Close TsFileTableWriter and will flush data automatically. diff --git a/python/tsfile/tsfile_writer.pyx b/python/tsfile/tsfile_writer.pyx index 20199195..c558984e 100644 --- a/python/tsfile/tsfile_writer.pyx +++ b/python/tsfile/tsfile_writer.pyx @@ -15,21 +15,21 @@ # specific language governing permissions and limitations # under the License. # - -#cython: language_level=3 - -from .tsfile_cpp cimport * -from .tsfile_py_cpp cimport * +import pandas from tsfile.row_record import RowRecord -from tsfile.schema import TimeseriesSchema as TimeseriesSchemaPy, DeviceSchema as DeviceSchemaPy from tsfile.schema import TableSchema as TableSchemaPy +from tsfile.schema import TimeseriesSchema as TimeseriesSchemaPy, DeviceSchema as DeviceSchemaPy from tsfile.tablet import Tablet as TabletPy +from .tsfile_cpp cimport * +from .tsfile_py_cpp cimport * + +#cython: language_level=3 cdef class TsFileWriterPy: cdef TsFileWriter writer - def __init__(self, pathname:str, memory_threshold:int = 128 * 1024 * 1024): + def __init__(self, pathname: str, memory_threshold: int = 128 * 1024 * 1024): self.writer = tsfile_writer_new_c(pathname, memory_threshold) def register_timeseries(self, device_name : str, timeseries_schema : TimeseriesSchemaPy): @@ -38,7 +38,7 @@ cdef class TsFileWriterPy: device_name: device name of the timeseries timeseries_schema: measurement's name/datatype/encoding/compressor """ - cdef TimeseriesSchema* c_schema = to_c_timeseries_schema(timeseries_schema) + cdef TimeseriesSchema * c_schema = to_c_timeseries_schema(timeseries_schema) cdef ErrorCode errno try: errno = tsfile_writer_register_timeseries_py_cpp(self.writer, device_name, c_schema) @@ -51,7 +51,7 @@ cdef class TsFileWriterPy: Register a device with tsfile writer. device_schema: the device definition, including device_name, some measurements' schema. """ - cdef DeviceSchema* device_schema_c = to_c_device_schema(device_schema) + cdef DeviceSchema * device_schema_c = to_c_device_schema(device_schema) cdef ErrorCode errno try: errno = tsfile_writer_register_device_py_cpp(self.writer, device_schema_c) @@ -64,7 +64,7 @@ cdef class TsFileWriterPy: Register a table with tsfile writer. table_schema: the table definition, include table_name, columns' schema. """ - cdef TableSchema* c_schema = to_c_table_schema(table_schema) + cdef TableSchema * c_schema = to_c_table_schema(table_schema) cdef ErrorCode errno try: errno = tsfile_writer_register_table_py_cpp(self.writer, c_schema) @@ -86,6 +86,15 @@ cdef class TsFileWriterPy: finally: free_c_tablet(ctablet) + def write_dataframe(self, target_table: str, dataframe: pandas.DataFrame): + cdef Tablet ctablet = dataframe_to_c_tablet(target_table, dataframe) + cdef ErrorCode errno + try: + errno = _tsfile_writer_write_table(self.writer, ctablet) + check_error(errno) + finally: + free_c_tablet(ctablet) + def write_row_record(self, record : RowRecord): """ Write a record into tsfile with tsfile writer. @@ -143,4 +152,3 @@ cdef class TsFileWriterPy: def __exit__(self, exc_type, exc_val, exc_tb): self.close() - diff --git a/python/tsfile/utils.py b/python/tsfile/utils.py index d27a0fae..a8ed5601 100644 --- a/python/tsfile/utils.py +++ b/python/tsfile/utils.py @@ -20,9 +20,11 @@ from typing import Optional import numpy as np import pandas as pd +from pandas.core.dtypes.common import is_integer_dtype from tsfile.exceptions import TableNotExistError, ColumnNotExistError from tsfile.tsfile_reader import TsFileReaderPy +from tsfile import ColumnSchema, TableSchema, ColumnCategory, TSDataType, TsFileTableWriter def to_dataframe(file_path: str, @@ -159,3 +161,117 @@ def to_dataframe(file_path: str, return df else: return pd.DataFrame() + + +def to_tsfile(dataframe: pd.DataFrame, + file_path: str, + table_name: Optional[str] = None, + time_column: Optional[str] = None, + tag_column: Optional[list[str]] = None, + ): + """ + Write a pandas DataFrame to a TsFile by inferring the table schema from the DataFrame. + + This function automatically infers the table schema based on the DataFrame's column + names and data types, then writes the data to a TsFile. + + Parameters + ---------- + dataframe : pd.DataFrame + The pandas DataFrame to write to TsFile. + - If a 'time' column (case-insensitive) exists, it will be used as the time column. + - Otherwise, the DataFrame index will be used as timestamps. + - All other columns will be treated as data columns. + + file_path : str + Path to the TsFile to write. Will be created if it doesn't exist. + + table_name : Optional[str], default None + Name of the table. If None, defaults to "table". + + time_column : Optional[str], default None + Name of the time column. If None, will look for a column named 'time' (case-insensitive), + or use the DataFrame index if no 'time' column is found. + + tag_column : Optional[list[str]], default None + List of column names to be treated as TAG columns. All other columns will be FIELD columns. + If None, all columns are treated as FIELD columns. + + Returns + ------- + None + + Raises + ------ + ValueError + If the DataFrame is empty or has no data columns. + """ + if dataframe is None or dataframe.empty: + raise ValueError("DataFrame cannot be None or empty") + + # Determine table name + if table_name is None: + table_name = "table" + + # Determine time column + time_col_name = None + if time_column is not None: + # Check if specified time column exists + if time_column not in dataframe.columns: + raise ValueError(f"Time column '{time_column}' not found in DataFrame") + # Check if time column is integer type (int64 or int) + if not is_integer_dtype(dataframe[time_column].dtype): + raise ValueError(f"Time column '{time_column}' must be integer type (int64 or int), got {dataframe[time_column].dtype}") + time_col_name = time_column + else: + # Look for 'time' column (case-insensitive) + for col in dataframe.columns: + if col.lower() == 'time': + # Check if time column is integer type + if is_integer_dtype(dataframe[col].dtype): + time_col_name = col + break + + # Get data columns (excluding time column) + data_columns = [col for col in dataframe.columns if col != time_col_name] + + if len(data_columns) == 0: + raise ValueError("DataFrame must have at least one data column besides the time column") + + # Normalize tag_column list (convert to lowercase for comparison) + tag_columns_lower = [] + if tag_column is not None: + for tag_col in tag_column: + if tag_col not in dataframe.columns: + raise ValueError(f"Tag column '{tag_col}' not found in DataFrame") + tag_columns_lower.append(tag_col.lower()) + + # Infer schema from DataFrame columns + column_schemas = [] + for col_name in data_columns: + # Infer data type from DataFrame column + col_dtype = dataframe[col_name].dtype + ts_data_type = TSDataType.from_pandas_datatype(col_dtype) + + # Determine category (TAG or FIELD) + if col_name.lower() in tag_columns_lower: + category = ColumnCategory.TAG + else: + category = ColumnCategory.FIELD + + column_schemas.append(ColumnSchema(col_name, ts_data_type, category)) + + # Create table schema + table_schema = TableSchema(table_name, column_schemas) + + # Prepare DataFrame for writing + # If time column is specified or found, rename it to 'time' if needed + # If no time column is found, write_dataframe will handle it by using index + df_to_write = dataframe.copy() + if time_col_name is not None and time_col_name != 'time': + # Rename time column to 'time' for consistency + df_to_write = df_to_write.rename(columns={time_col_name: 'time'}) + + # Write DataFrame to TsFile + with TsFileTableWriter(file_path, table_schema) as writer: + writer.write_dataframe(df_to_write) \ No newline at end of file
