This is an automated email from the ASF dual-hosted git repository. colinlee pushed a commit to branch support_dataframe_to_tsfile in repository https://gitbox.apache.org/repos/asf/tsfile.git
commit dad7c61a3d7a8144ec66b72283c6312c7c418e61 Author: ColinLee <[email protected]> AuthorDate: Wed Jan 14 21:35:53 2026 +0800 support to_tsfile. --- python/tests/test_dataframe.py | 276 +++++++++++++++++++++ python/tests/test_to_tsfile.py | 376 +++++++++++++++++++++++++++++ python/tests/test_write_and_read.py | 455 +---------------------------------- python/tsfile/tsfile_table_writer.py | 3 +- python/tsfile/utils.py | 13 +- 5 files changed, 670 insertions(+), 453 deletions(-) diff --git a/python/tests/test_dataframe.py b/python/tests/test_dataframe.py new file mode 100644 index 00000000..9c57e154 --- /dev/null +++ b/python/tests/test_dataframe.py @@ -0,0 +1,276 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +import os + +import numpy as np +import pandas as pd +import pytest +from pandas.core.dtypes.common import is_integer_dtype + +from tsfile import ColumnSchema, TableSchema, TSDataType +from tsfile import TsFileTableWriter, ColumnCategory +from tsfile import to_dataframe +from tsfile.exceptions import ColumnNotExistError, TypeMismatchError + + +def convert_to_nullable_types(df): + """ + Convert DataFrame columns to nullable types to match returned DataFrame from to_dataframe. + This handles the fact that returned DataFrames use nullable types (Int64, Float64, etc.) + to support Null values. + """ + df = df.copy() + for col in df.columns: + dtype = df[col].dtype + if dtype == 'int64': + df[col] = df[col].astype('Int64') + elif dtype == 'int32': + df[col] = df[col].astype('Int32') + elif dtype == 'float64': + df[col] = df[col].astype('Float64') + elif dtype == 'float32': + df[col] = df[col].astype('Float32') + elif dtype == 'bool': + df[col] = df[col].astype('boolean') + return df + + +def test_write_dataframe_basic(): + """Test basic write_dataframe functionality.""" + table = TableSchema("test_table", + [ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG), + ColumnSchema("value", TSDataType.DOUBLE, ColumnCategory.FIELD), + ColumnSchema("value2", TSDataType.INT64, ColumnCategory.FIELD)]) + tsfile_path = "test_write_dataframe_basic.tsfile" + try: + if os.path.exists(tsfile_path): + os.remove(tsfile_path) + + with TsFileTableWriter(tsfile_path, table) as writer: + # Create DataFrame with 'time' column + df = pd.DataFrame({ + 'time': [i for i in range(100)], + 'device': [f"device{i}" for i in range(100)], + 'value': [i * 1.5 for i in range(100)], + 'value2': [i * 10 for i in range(100)] + }) + writer.write_dataframe(df) + + df_read = to_dataframe(tsfile_path, table_name="test_table") + # Sort both DataFrames by time for comparison + df_read = df_read.sort_values('time').reset_index(drop=True) + df_sorted = convert_to_nullable_types(df.sort_values('time').reset_index(drop=True)) + assert df_read.shape == (100, 4) + assert df_read["time"].equals(df_sorted["time"]) + assert df_read["device"].equals(df_sorted["device"]) + assert df_read["value"].equals(df_sorted["value"]) + assert df_read["value2"].equals(df_sorted["value2"]) + finally: + if os.path.exists(tsfile_path): + os.remove(tsfile_path) + + +def test_write_dataframe_with_index(): + """Test write_dataframe using DataFrame index as time when no 'time' column exists.""" + table = TableSchema("test_table", + [ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG), + ColumnSchema("value", TSDataType.DOUBLE, ColumnCategory.FIELD)]) + tsfile_path = "test_write_dataframe_index.tsfile" + try: + if os.path.exists(tsfile_path): + os.remove(tsfile_path) + + with TsFileTableWriter(tsfile_path, table) as writer: + # Create DataFrame without 'time' column, use index + df = pd.DataFrame({ + 'device': [f"device{i}" for i in range(50)], + 'value': [i * 2.5 for i in range(50)] + }) + df.index = [i * 10 for i in range(50)] # Set index as timestamps + writer.write_dataframe(df) + + # Verify by reading back + df_read = to_dataframe(tsfile_path, table_name="test_table") + # Sort both DataFrames by time for comparison + df_read = df_read.sort_values('time').reset_index(drop=True) + df_sorted = df.sort_index() + df_sorted = convert_to_nullable_types(df_sorted.reset_index(drop=True)) + # Create time series from original index + time_series = pd.Series(df.sort_index().index.values, dtype='Int64') + assert df_read.shape == (50, 3) + # Compare time column with sorted index + assert df_read["time"].equals(time_series) + assert df_read["device"].equals(df_sorted["device"]) + assert df_read["value"].equals(df_sorted["value"]) + finally: + if os.path.exists(tsfile_path): + os.remove(tsfile_path) + + +def test_write_dataframe_case_insensitive(): + """Test write_dataframe with case-insensitive column matching.""" + table = TableSchema("test_table", + [ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG), + ColumnSchema("value", TSDataType.DOUBLE, ColumnCategory.FIELD)]) + tsfile_path = "test_write_dataframe_case.tsfile" + try: + if os.path.exists(tsfile_path): + os.remove(tsfile_path) + + with TsFileTableWriter(tsfile_path, table) as writer: + # DataFrame with different case column names + df = pd.DataFrame({ + 'Time': [i for i in range(30)], # Capital T + 'Device': [f"device{i}" for i in range(30)], # Capital D + 'VALUE': [i * 3.0 for i in range(30)] # All caps + }) + writer.write_dataframe(df) + + # Verify by reading back + df_read = to_dataframe(tsfile_path, table_name="test_table") + # Sort both DataFrames by time for comparison + df_read = df_read.sort_values('time').reset_index(drop=True) + df_sorted = convert_to_nullable_types(df.sort_values('Time').reset_index(drop=True)) + assert df_read.shape == (30, 3) + assert df_read["time"].equals(df_sorted["Time"]) + assert df_read["device"].equals(df_sorted["Device"]) + assert df_read["value"].equals(df_sorted["VALUE"]) + finally: + if os.path.exists(tsfile_path): + os.remove(tsfile_path) + + +def test_write_dataframe_column_not_in_schema(): + """Test write_dataframe raises ColumnNotExistError when DataFrame has extra columns.""" + table = TableSchema("test_table", + [ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG), + ColumnSchema("value", TSDataType.DOUBLE, ColumnCategory.FIELD)]) + tsfile_path = "test_write_dataframe_extra_col.tsfile" + try: + if os.path.exists(tsfile_path): + os.remove(tsfile_path) + + with TsFileTableWriter(tsfile_path, table) as writer: + # DataFrame with extra column not in schema + df = pd.DataFrame({ + 'time': [i for i in range(10)], + 'device': [f"device{i}" for i in range(10)], + 'value': [i * 1.0 for i in range(10)], + 'extra_column': [i for i in range(10)] # Not in schema + }) + with pytest.raises(ColumnNotExistError) as exc_info: + writer.write_dataframe(df) + assert "extra_column" in str(exc_info.value) + finally: + if os.path.exists(tsfile_path): + os.remove(tsfile_path) + + +def test_write_dataframe_type_mismatch(): + """Test write_dataframe raises TypeMismatchError when types are incompatible.""" + table = TableSchema("test_table", + [ColumnSchema("value", TSDataType.STRING, ColumnCategory.FIELD)]) + tsfile_path = "test_write_dataframe_type_mismatch.tsfile" + try: + if os.path.exists(tsfile_path): + os.remove(tsfile_path) + + with TsFileTableWriter(tsfile_path, table) as writer: + # DataFrame with incompatible type (INT64 vs STRING) + df = pd.DataFrame({ + 'time': [i for i in range(10)], + 'value': [i for i in range(10)] # INT64, but schema expects STRING + }) + with pytest.raises(TypeMismatchError) as exc_info: + writer.write_dataframe(df) + assert "Type mismatches" in str(exc_info.value) + finally: + if os.path.exists(tsfile_path): + os.remove(tsfile_path) + + +def test_write_dataframe_all_datatypes(): + """Test write_dataframe with all supported data types.""" + table = TableSchema("test_table", + [ColumnSchema("bool_col", TSDataType.BOOLEAN, ColumnCategory.FIELD), + ColumnSchema("int32_col", TSDataType.INT32, ColumnCategory.FIELD), + ColumnSchema("int64_col", TSDataType.INT64, ColumnCategory.FIELD), + ColumnSchema("float_col", TSDataType.FLOAT, ColumnCategory.FIELD), + ColumnSchema("double_col", TSDataType.DOUBLE, ColumnCategory.FIELD), + ColumnSchema("string_col", TSDataType.STRING, ColumnCategory.FIELD), + ColumnSchema("blob_col", TSDataType.BLOB, ColumnCategory.FIELD)]) + tsfile_path = "test_write_dataframe_all_types.tsfile" + try: + if os.path.exists(tsfile_path): + os.remove(tsfile_path) + + with TsFileTableWriter(tsfile_path, table) as writer: + df = pd.DataFrame({ + 'time': [i for i in range(50)], + 'bool_col': [i % 2 == 0 for i in range(50)], + 'int32_col': pd.Series([i for i in range(50)], dtype='int32'), + 'int64_col': [i * 10 for i in range(50)], + 'float_col': pd.Series([i * 1.5 for i in range(50)], dtype='float32'), + 'double_col': [i * 2.5 for i in range(50)], + 'string_col': [f"str{i}" for i in range(50)], + 'blob_col': [f"blob{i}".encode('utf-8') for i in range(50)] + }) + writer.write_dataframe(df) + + # Verify by reading back + df_read = to_dataframe(tsfile_path, table_name="test_table") + # Sort both DataFrames by time for comparison + df_read = df_read.sort_values('time').reset_index(drop=True) + df_sorted = convert_to_nullable_types(df.sort_values('time').reset_index(drop=True)) + assert df_read.shape == (50, 8) + assert df_read["bool_col"].equals(df_sorted["bool_col"]) + assert df_read["int32_col"].equals(df_sorted["int32_col"]) + assert df_read["int64_col"].equals(df_sorted["int64_col"]) + assert np.allclose(df_read["float_col"], df_sorted["float_col"]) + assert np.allclose(df_read["double_col"], df_sorted["double_col"]) + assert df_read["string_col"].equals(df_sorted["string_col"]) + # BLOB comparison + for i in range(50): + assert df_read["blob_col"].iloc[i] == df_sorted["blob_col"].iloc[i] + finally: + if os.path.exists(tsfile_path): + os.remove(tsfile_path) + + +def test_write_dataframe_empty(): + """Test write_dataframe with empty DataFrame.""" + table = TableSchema("test_table", + [ColumnSchema("value", TSDataType.DOUBLE, ColumnCategory.FIELD)]) + tsfile_path = "test_write_dataframe_empty.tsfile" + try: + if os.path.exists(tsfile_path): + os.remove(tsfile_path) + + with TsFileTableWriter(tsfile_path, table) as writer: + # Empty DataFrame + df = pd.DataFrame({ + 'time': [], + 'value': [] + }) + # Should not raise error (current implementation just passes) + with pytest.raises(ValueError) as err: + writer.write_dataframe(df) + + finally: + if os.path.exists(tsfile_path): + os.remove(tsfile_path) diff --git a/python/tests/test_to_tsfile.py b/python/tests/test_to_tsfile.py new file mode 100644 index 00000000..0e4adcd2 --- /dev/null +++ b/python/tests/test_to_tsfile.py @@ -0,0 +1,376 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +import os + +import numpy as np +import pandas as pd +import pytest +from pandas.core.dtypes.common import is_integer_dtype + +from tsfile import to_dataframe, ColumnCategory +from tsfile.utils import to_tsfile + + +def convert_to_nullable_types(df): + """ + Convert DataFrame columns to nullable types to match returned DataFrame from to_dataframe. + This handles the fact that returned DataFrames use nullable types (Int64, Float64, etc.) + to support Null values. + """ + df = df.copy() + for col in df.columns: + dtype = df[col].dtype + if dtype == 'int64': + df[col] = df[col].astype('Int64') + elif dtype == 'int32': + df[col] = df[col].astype('Int32') + elif dtype == 'float64': + df[col] = df[col].astype('Float64') + elif dtype == 'float32': + df[col] = df[col].astype('Float32') + elif dtype == 'bool': + df[col] = df[col].astype('boolean') + return df + + +def test_to_tsfile_basic(): + """Test basic to_tsfile functionality with time column.""" + tsfile_path = "test_to_tsfile_basic.tsfile" + try: + if os.path.exists(tsfile_path): + os.remove(tsfile_path) + + # Create DataFrame with 'time' column + df = pd.DataFrame({ + 'time': [i for i in range(100)], + 'device': [f"device{i}" for i in range(100)], + 'value': [i * 1.5 for i in range(100)], + 'value2': [i * 10 for i in range(100)] + }) + + to_tsfile(df, tsfile_path, table_name="test_table") + + # Verify by reading back + df_read = to_dataframe(tsfile_path, table_name="test_table") + df_read = df_read.sort_values('time').reset_index(drop=True) + df_sorted = convert_to_nullable_types(df.sort_values('time').reset_index(drop=True)) + + assert df_read.shape == (100, 4) + assert df_read["time"].equals(df_sorted["time"]) + assert df_read["device"].equals(df_sorted["device"]) + assert df_read["value"].equals(df_sorted["value"]) + assert df_read["value2"].equals(df_sorted["value2"]) + finally: + if os.path.exists(tsfile_path): + os.remove(tsfile_path) + + +def test_to_tsfile_with_index(): + """Test to_tsfile using DataFrame index as time when no 'time' column exists.""" + tsfile_path = "test_to_tsfile_index.tsfile" + try: + if os.path.exists(tsfile_path): + os.remove(tsfile_path) + + # Create DataFrame without 'time' column, use index + df = pd.DataFrame({ + 'device': [f"device{i}" for i in range(50)], + 'value': [i * 2.5 for i in range(50)] + }) + df.index = [i * 10 for i in range(50)] # Set index as timestamps + + to_tsfile(df, tsfile_path, table_name="test_table") + + # Verify by reading back + df_read = to_dataframe(tsfile_path, table_name="test_table") + df_read = df_read.sort_values('time').reset_index(drop=True) + df_sorted = df.sort_index() + df_sorted = convert_to_nullable_types(df_sorted.reset_index(drop=True)) + time_series = pd.Series(df.sort_index().index.values, dtype='Int64') + + assert df_read.shape == (50, 3) + assert df_read["time"].equals(time_series) + assert df_read["device"].equals(df_sorted["device"]) + assert df_read["value"].equals(df_sorted["value"]) + finally: + if os.path.exists(tsfile_path): + os.remove(tsfile_path) + + +def test_to_tsfile_custom_time_column(): + """Test to_tsfile with custom time column name.""" + tsfile_path = "test_to_tsfile_custom_time.tsfile" + try: + if os.path.exists(tsfile_path): + os.remove(tsfile_path) + + df = pd.DataFrame({ + 'timestamp': [i for i in range(30)], + 'device': [f"device{i}" for i in range(30)], + 'value': [i * 3.0 for i in range(30)] + }) + + to_tsfile(df, tsfile_path, table_name="test_table", time_column="timestamp") + + # Verify by reading back + df_read = to_dataframe(tsfile_path, table_name="test_table") + df_read = df_read.sort_values('time').reset_index(drop=True) + df_sorted = convert_to_nullable_types(df.sort_values('timestamp').reset_index(drop=True)) + + assert df_read.shape == (30, 3) + assert df_read["time"].equals(df_sorted["timestamp"]) + assert df_read["device"].equals(df_sorted["device"]) + assert df_read["value"].equals(df_sorted["value"]) + finally: + if os.path.exists(tsfile_path): + os.remove(tsfile_path) + + +def test_to_tsfile_with_tag_columns(): + """Test to_tsfile with tag columns specified.""" + tsfile_path = "test_to_tsfile_tags.tsfile" + try: + if os.path.exists(tsfile_path): + os.remove(tsfile_path) + + df = pd.DataFrame({ + 'time': [i for i in range(20)], + 'device': [f"device{i}" for i in range(20)], + 'location': [f"loc{i % 5}" for i in range(20)], + 'value': [i * 1.5 for i in range(20)] + }) + + to_tsfile(df, tsfile_path, table_name="test_table", tag_column=["device", "location"]) + + # Verify by reading back + df_read = to_dataframe(tsfile_path, table_name="test_table") + df_read = df_read.sort_values('time').reset_index(drop=True) + df_sorted = convert_to_nullable_types(df.sort_values('time').reset_index(drop=True)) + + assert df_read.shape == (20, 4) + assert df_read["device"].equals(df_sorted["device"]) + assert df_read["location"].equals(df_sorted["location"]) + assert df_read["value"].equals(df_sorted["value"]) + finally: + if os.path.exists(tsfile_path): + os.remove(tsfile_path) + + +def test_to_tsfile_all_datatypes(): + """Test to_tsfile with all supported data types.""" + tsfile_path = "test_to_tsfile_all_types.tsfile" + try: + if os.path.exists(tsfile_path): + os.remove(tsfile_path) + + df = pd.DataFrame({ + 'time': [i for i in range(50)], + 'bool_col': [i % 2 == 0 for i in range(50)], + 'int32_col': pd.Series([i for i in range(50)], dtype='int32'), + 'int64_col': [i * 10 for i in range(50)], + 'float_col': pd.Series([i * 1.5 for i in range(50)], dtype='float32'), + 'double_col': [i * 2.5 for i in range(50)], + 'string_col': [f"str{i}" for i in range(50)], + 'blob_col': [f"blob{i}".encode('utf-8') for i in range(50)] + }) + + to_tsfile(df, tsfile_path, table_name="test_table") + + # Verify by reading back + df_read = to_dataframe(tsfile_path, table_name="test_table") + df_read = df_read.sort_values('time').reset_index(drop=True) + df_sorted = convert_to_nullable_types(df.sort_values('time').reset_index(drop=True)) + + assert df_read.shape == (50, 8) + assert df_read["bool_col"].equals(df_sorted["bool_col"]) + assert df_read["int32_col"].equals(df_sorted["int32_col"]) + assert df_read["int64_col"].equals(df_sorted["int64_col"]) + assert np.allclose(df_read["float_col"], df_sorted["float_col"]) + assert np.allclose(df_read["double_col"], df_sorted["double_col"]) + assert df_read["string_col"].equals(df_sorted["string_col"]) + # BLOB comparison + for i in range(50): + assert df_read["blob_col"].iloc[i] == df_sorted["blob_col"].iloc[i] + finally: + if os.path.exists(tsfile_path): + os.remove(tsfile_path) + + +def test_to_tsfile_default_table_name(): + """Test to_tsfile with default table name.""" + tsfile_path = "test_to_tsfile_default_name.tsfile" + try: + if os.path.exists(tsfile_path): + os.remove(tsfile_path) + + df = pd.DataFrame({ + 'time': [i for i in range(10)], + 'value': [i * 1.0 for i in range(10)] + }) + + to_tsfile(df, tsfile_path) # No table_name specified + + # Verify by reading back with default table name + df_read = to_dataframe(tsfile_path, table_name="table") + assert df_read.shape == (10, 2) + finally: + if os.path.exists(tsfile_path): + os.remove(tsfile_path) + + +def test_to_tsfile_case_insensitive_time(): + """Test to_tsfile with case-insensitive time column.""" + tsfile_path = "test_to_tsfile_case_time.tsfile" + try: + if os.path.exists(tsfile_path): + os.remove(tsfile_path) + + df = pd.DataFrame({ + 'Time': [i for i in range(20)], # Capital T + 'value': [i * 2.0 for i in range(20)] + }) + + to_tsfile(df, tsfile_path, table_name="test_table") + + # Verify by reading back + df_read = to_dataframe(tsfile_path, table_name="test_table") + assert df_read.shape == (20, 2) + assert df_read["time"].equals(pd.Series([i for i in range(20)], dtype='Int64')) + finally: + if os.path.exists(tsfile_path): + os.remove(tsfile_path) + + +def test_to_tsfile_empty_dataframe(): + """Test to_tsfile raises error for empty DataFrame.""" + tsfile_path = "test_to_tsfile_empty.tsfile" + try: + if os.path.exists(tsfile_path): + os.remove(tsfile_path) + + df = pd.DataFrame() + + with pytest.raises(ValueError, match="DataFrame cannot be None or empty"): + to_tsfile(df, tsfile_path) + finally: + if os.path.exists(tsfile_path): + os.remove(tsfile_path) + + +def test_to_tsfile_no_data_columns(): + """Test to_tsfile raises error when only time column exists.""" + tsfile_path = "test_to_tsfile_no_data.tsfile" + try: + if os.path.exists(tsfile_path): + os.remove(tsfile_path) + + df = pd.DataFrame({ + 'time': [i for i in range(10)] + }) + + with pytest.raises(ValueError, match="DataFrame must have at least one data column"): + to_tsfile(df, tsfile_path) + finally: + if os.path.exists(tsfile_path): + os.remove(tsfile_path) + + +def test_to_tsfile_invalid_time_column(): + """Test to_tsfile raises error for invalid time column.""" + tsfile_path = "test_to_tsfile_invalid_time.tsfile" + try: + if os.path.exists(tsfile_path): + os.remove(tsfile_path) + + df = pd.DataFrame({ + 'timestamp': [i for i in range(10)], + 'value': [i * 1.0 for i in range(10)] + }) + + # Time column doesn't exist + with pytest.raises(ValueError, match="Time column 'time' not found"): + to_tsfile(df, tsfile_path, time_column="time") + finally: + if os.path.exists(tsfile_path): + os.remove(tsfile_path) + + +def test_to_tsfile_non_integer_time_column(): + """Test to_tsfile raises error for non-integer time column.""" + tsfile_path = "test_to_tsfile_non_int_time.tsfile" + try: + if os.path.exists(tsfile_path): + os.remove(tsfile_path) + + df = pd.DataFrame({ + 'time': [f"time{i}" for i in range(10)], # String, not integer + 'value': [i * 1.0 for i in range(10)] + }) + + with pytest.raises(TypeError, match="must be integer type"): + to_tsfile(df, tsfile_path) + finally: + if os.path.exists(tsfile_path): + os.remove(tsfile_path) + + +def test_to_tsfile_invalid_tag_column(): + """Test to_tsfile raises error for invalid tag column.""" + tsfile_path = "test_to_tsfile_invalid_tag.tsfile" + try: + if os.path.exists(tsfile_path): + os.remove(tsfile_path) + + df = pd.DataFrame({ + 'time': [i for i in range(10)], + 'value': [i * 1.0 for i in range(10)] + }) + + with pytest.raises(ValueError, match="Tag column 'invalid' not found"): + to_tsfile(df, tsfile_path, tag_column=["invalid"]) + finally: + if os.path.exists(tsfile_path): + os.remove(tsfile_path) + + +def test_to_tsfile_string_vs_blob(): + """Test to_tsfile correctly distinguishes between STRING and BLOB.""" + tsfile_path = "test_to_tsfile_string_blob.tsfile" + try: + if os.path.exists(tsfile_path): + os.remove(tsfile_path) + + df = pd.DataFrame({ + 'time': [i for i in range(20)], + 'string_col': [f"str{i}" for i in range(20)], # String + 'blob_col': [f"blob{i}".encode('utf-8') for i in range(20)] # Bytes + }) + + to_tsfile(df, tsfile_path, table_name="test_table") + + # Verify by reading back + df_read = to_dataframe(tsfile_path, table_name="test_table") + df_read = df_read.sort_values('time').reset_index(drop=True) + df_sorted = convert_to_nullable_types(df.sort_values('time').reset_index(drop=True)) + + assert df_read["string_col"].equals(df_sorted["string_col"]) + for i in range(20): + assert df_read["blob_col"].iloc[i] == df_sorted["blob_col"].iloc[i] + finally: + if os.path.exists(tsfile_path): + os.remove(tsfile_path) diff --git a/python/tests/test_write_and_read.py b/python/tests/test_write_and_read.py index ad8d698a..1ffc22b9 100644 --- a/python/tests/test_write_and_read.py +++ b/python/tests/test_write_and_read.py @@ -22,6 +22,7 @@ from datetime import date import numpy as np import pandas as pd import pytest +from pandas import Float64Dtype from pandas.core.dtypes.common import is_integer_dtype from tsfile import ColumnSchema, TableSchema, TSEncoding @@ -545,7 +546,7 @@ def test_tsfile_to_df(): assert df1.shape == (4097, 4) assert df1["value2"].sum() == 100 * (1 + 4096) / 2 * 4096 assert is_integer_dtype(df1["time"]) - assert df1["value"].dtype == np.float64 + assert df1["value"].dtype == Float64Dtype() assert is_integer_dtype(df1["value2"]) df2 = to_dataframe("table_write_to_df.tsfile", column_names=["device", "value2"]) assert df2.shape == (4097, 3) @@ -756,457 +757,9 @@ def test_tree_all_datatype_query_to_dataframe_variants(): pass finally: - if os.path.exists(tsfile_path): - os.remove(tsfile_path) - - -def test_table_all_datatype_query_to_dataframe_variants(): - tsfile_path = "test_table.tsfile" - table = TableSchema( - "test_table", - [ - ColumnSchema("Device1", TSDataType.STRING, ColumnCategory.TAG), - ColumnSchema("Device2", TSDataType.STRING, ColumnCategory.TAG), - ColumnSchema("Value1", TSDataType.BOOLEAN, ColumnCategory.FIELD), - ColumnSchema("Value2", TSDataType.INT32, ColumnCategory.FIELD), - ColumnSchema("Value3", TSDataType.INT64, ColumnCategory.FIELD), - ColumnSchema("Value4", TSDataType.FLOAT, ColumnCategory.FIELD), - ColumnSchema("Value5", TSDataType.DOUBLE, ColumnCategory.FIELD), - ColumnSchema("Value6", TSDataType.TEXT, ColumnCategory.FIELD), - ColumnSchema("Value7", TSDataType.STRING, ColumnCategory.FIELD), - ColumnSchema("Value8", TSDataType.BLOB, ColumnCategory.FIELD), - ColumnSchema("Value9", TSDataType.TIMESTAMP, ColumnCategory.FIELD), - ColumnSchema("Value10", TSDataType.DATE, ColumnCategory.FIELD), - ], - ) - dateSet = set() - try: - if os.path.exists(tsfile_path): - os.remove(tsfile_path) - max_row_num = 100 - with TsFileTableWriter(tsfile_path, table) as writer: - tablet = Tablet( - [ - "Device1", - "Device2", - "Value1", - "Value2", - "Value3", - "Value4", - "Value5", - "Value6", - "Value7", - "Value8", - "Value9", - "Value10", - ], - [ - TSDataType.STRING, - TSDataType.STRING, - TSDataType.BOOLEAN, - TSDataType.INT32, - TSDataType.INT64, - TSDataType.FLOAT, - TSDataType.DOUBLE, - TSDataType.TEXT, - TSDataType.STRING, - TSDataType.BLOB, - TSDataType.TIMESTAMP, - TSDataType.DATE, - ], - max_row_num, - ) - for i in range(max_row_num): - tablet.add_timestamp(i, i) - tablet.add_value_by_name("Device1", i, "d1_" + str(i)) - tablet.add_value_by_name("Device2", i, "d2_" + str(i)) - tablet.add_value_by_name("Value1", i, i % 2 == 0) - tablet.add_value_by_name("Value2", i, i * 3) - tablet.add_value_by_name("Value3", i, i * 4) - tablet.add_value_by_name("Value4", i, i * 5.5) - tablet.add_value_by_name("Value5", i, i * 6.6) - tablet.add_value_by_name("Value6", i, f"string_value_{i}") - tablet.add_value_by_name("Value7", i, f"text_value_{i}") - tablet.add_value_by_name("Value8", i, f"blob_data_{i}".encode('utf-8')) - tablet.add_value_by_name("Value9", i, i * 9) - tablet.add_value_by_name("Value10", i, date(2025, 1, i % 20 + 1)) - dateSet.add(date(2025, 1, i % 20 + 1)) - writer.write_table(tablet) - - df1_1 = to_dataframe(tsfile_path) - assert df1_1.shape[0] == max_row_num - for i in range(max_row_num): - assert df1_1.iloc[i, 1] == "d1_" + str(df1_1.iloc[i, 0]) - assert df1_1.iloc[i, 2] == "d2_" + str(df1_1.iloc[i, 0]) - - df2_1 = to_dataframe(tsfile_path, column_names=["Value1"]) - for i in range(max_row_num): - assert df2_1.iloc[i, 1] == np.bool_(df2_1.iloc[i, 0] % 2 == 0) - df2_2 = to_dataframe(tsfile_path, column_names=["Value2"]) - for i in range(max_row_num): - assert df2_2.iloc[i, 1] == np.int32(df2_2.iloc[i, 0] * 3) - df2_3 = to_dataframe(tsfile_path, column_names=["Value3"]) - for i in range(max_row_num): - assert df2_3.iloc[i, 1] == np.int64(df2_3.iloc[i, 0] * 4) - df2_4 = to_dataframe(tsfile_path, column_names=["Value4"]) - for i in range(max_row_num): - assert df2_4.iloc[i, 1] == np.float32(df2_4.iloc[i, 0] * 5.5) - df2_5 = to_dataframe(tsfile_path, column_names=["Value5"]) - for i in range(max_row_num): - assert df2_5.iloc[i, 1] == np.float64(df2_5.iloc[i, 0] * 6.6) - df2_6 = to_dataframe(tsfile_path, column_names=["Value6"]) - for i in range(max_row_num): - assert df2_6.iloc[i, 1] == f"string_value_{df2_6.iloc[i, 0]}" - df2_7 = to_dataframe(tsfile_path, column_names=["Value7"]) - for i in range(max_row_num): - assert df2_7.iloc[i, 1] == f"text_value_{df2_7.iloc[i, 0]}" - df2_8 = to_dataframe(tsfile_path, column_names=["Value8"]) - for i in range(max_row_num): - assert df2_8.iloc[i, 1] == f"blob_data_{df2_8.iloc[i, 0]}".encode('utf-8') - df2_9 = to_dataframe(tsfile_path, column_names=["Value9"]) - for i in range(max_row_num): - assert df2_9.iloc[i, 1] == np.int64(df2_9.iloc[i, 0] * 9) - df2_10 = to_dataframe(tsfile_path, column_names=["Value10"]) - for i in range(max_row_num): - assert df2_10.iloc[i, 1] in dateSet - df2_11 = to_dataframe(tsfile_path, column_names=["Device1", "Value1"]) - for i in range(max_row_num): - assert df2_11.iloc[i, 1] == "d1_" + str(df2_11.iloc[i, 0]) - assert df2_11.iloc[i, 2] == np.bool_(df2_11.iloc[i, 0] % 2 == 0) - df2_12 = to_dataframe( - tsfile_path, - column_names=[ - "Device1", - "Device2", - "Value1", - "Value2", - "Value3", - "Value4", - "Value5", - "Value6", - "Value7", - "Value8", - "Value9", - "Value10", - ], - ) - for i in range(max_row_num): - assert df2_12.iloc[i, 1] == "d1_" + str(df2_12.iloc[i, 0]) - assert df2_12.iloc[i, 2] == "d2_" + str(df2_12.iloc[i, 0]) - assert df2_12.iloc[i, 3] == np.bool_(df2_12.iloc[i, 0] % 2 == 0) - assert df2_12.iloc[i, 4] == np.int32(df2_12.iloc[i, 0] * 3) - assert df2_12.iloc[i, 5] == np.int64(df2_12.iloc[i, 0] * 4) - assert df2_12.iloc[i, 6] == np.float32(df2_12.iloc[i, 0] * 5.5) - assert df2_12.iloc[i, 7] == np.float64(df2_12.iloc[i, 0] * 6.6) - assert df2_12.iloc[i, 8] == f"string_value_{df2_12.iloc[i, 0]}" - assert df2_12.iloc[i, 9] == f"text_value_{df2_12.iloc[i, 0]}" - assert df2_12.iloc[i, 10] == f"blob_data_{df2_12.iloc[i, 0]}".encode( - "utf-8" - ) - assert df2_12.iloc[i, 11] == np.int64(df2_12.iloc[i, 0] * 9) - assert df2_12.iloc[i, 12] == date(2025, 1, df2_12.iloc[i, 0] % 20 + 1) - df2_13 = to_dataframe( - tsfile_path, column_names=["Device1", "Device2", "Value1"] - ) - for i in range(max_row_num): - assert df2_13.iloc[i, 1] == "d1_" + str(df2_13.iloc[i, 0]) - assert df2_13.iloc[i, 2] == "d2_" + str(df2_13.iloc[i, 0]) - assert df2_13.iloc[i, 3] == np.bool_(df2_13.iloc[i, 0] % 2 == 0) - - df3_1 = to_dataframe(tsfile_path, table_name="test_table") - assert df3_1.shape[0] == max_row_num - assert df3_1.iloc[0, 0] == 0 - df3_2 = to_dataframe(tsfile_path, table_name="TEST_TABLE") - assert df3_2.shape[0] == max_row_num - assert df3_2.iloc[0, 0] == 0 - - df4_1 = to_dataframe(tsfile_path, start_time=10) - assert df4_1.shape[0] == 90 - df4_2 = to_dataframe(tsfile_path, start_time=-10) - assert df4_2.shape[0] == max_row_num - df4_3 = to_dataframe(tsfile_path, end_time=5) - assert df4_3.shape[0] == 6 - df4_4 = to_dataframe(tsfile_path, end_time=-5) - assert df4_4.shape[0] == 0 - df4_5 = to_dataframe(tsfile_path, start_time=5, end_time=5) - assert df4_5.shape[0] == 1 - df4_6 = to_dataframe(tsfile_path, start_time=-5, end_time=-5) - assert df4_6.shape[0] == 0 - df4_7 = to_dataframe(tsfile_path, start_time=10, end_time=-10) - assert df4_7.shape[0] == 0 - df4_8 = to_dataframe(tsfile_path, start_time=-10, end_time=10) - assert df4_8.shape[0] == 11 - df4_8 = to_dataframe(tsfile_path, start_time=-50, end_time=50) - assert df4_8.shape[0] == 51 - - df5_1 = to_dataframe(tsfile_path, max_row_num=1) - assert df5_1.shape[0] == 1 - df5_2 = to_dataframe(tsfile_path, max_row_num=50) - assert df5_2.shape[0] == 50 - df5_3 = to_dataframe(tsfile_path, max_row_num=100) - assert df5_3.shape[0] == 100 - df5_4 = to_dataframe(tsfile_path, max_row_num=1000) - assert df5_4.shape[0] == 100 - df5_5 = to_dataframe(tsfile_path, max_row_num=0) - assert df5_5.shape[0] == 0 - df5_6 = to_dataframe(tsfile_path, max_row_num=-10) - assert df5_6.shape[0] == 0 - - for df6_1 in to_dataframe(tsfile_path, max_row_num=20, as_iterator=True): - assert df6_1.shape[0] == 20 - for df6_2 in to_dataframe(tsfile_path, max_row_num=1000, as_iterator=True): - assert df6_2.shape[0] == 100 - - for df7_1 in to_dataframe( - tsfile_path, - table_name="test_table", - column_names=["Device1", "Value1"], - start_time=21, - end_time=50, - max_row_num=10, - as_iterator=True, - ): - assert df7_1.shape[0] == 10 - for i in range(30): - assert df2_11.iloc[i, 1] == "d1_" + str(df2_11.iloc[i, 0]) - assert df2_11.iloc[i, 2] == np.bool_(df2_11.iloc[i, 0] % 2 == 0) - - try: - to_dataframe(tsfile_path, table_name="non_existent_table") - except TableNotExistError as e: - assert e.args[0] == "[non_existent_table] Requested table does not exist" - - try: - to_dataframe(tsfile_path, column_names=["non_existent_column"]) - except ColumnNotExistError as e: - assert e.args[0] == "[non_existent_column] Column does not exist" - - finally: - if os.path.exists(tsfile_path): - os.remove(tsfile_path) - - -def convert_to_nullable_types(df): - """ - Convert DataFrame columns to nullable types to match returned DataFrame from to_dataframe. - This handles the fact that returned DataFrames use nullable types (Int64, Float64, etc.) - to support Null values. - """ - df = df.copy() - for col in df.columns: - dtype = df[col].dtype - if dtype == 'int64': - df[col] = df[col].astype('Int64') - elif dtype == 'int32': - df[col] = df[col].astype('Int32') - elif dtype == 'float64': - df[col] = df[col].astype('Float64') - elif dtype == 'float32': - df[col] = df[col].astype('Float32') - elif dtype == 'bool': - df[col] = df[col].astype('boolean') - return df - - -def test_write_dataframe_basic(): - table = TableSchema("test_table", - [ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG), - ColumnSchema("value", TSDataType.DOUBLE, ColumnCategory.FIELD), - ColumnSchema("value2", TSDataType.INT64, ColumnCategory.FIELD)]) - tsfile_path = "test_write_dataframe_basic.tsfile" - try: - if os.path.exists(tsfile_path): - os.remove(tsfile_path) - - with TsFileTableWriter(tsfile_path, table) as writer: - # Create DataFrame with 'time' column - df = pd.DataFrame({ - 'time': [i for i in range(100)], - 'device': [f"device{i}" for i in range(100)], - 'value': [i * 1.5 for i in range(100)], - 'value2': [i * 10 for i in range(100)] - }) - writer.write_dataframe(df) - - df_read = to_dataframe(tsfile_path, table_name="test_table") - # Sort both DataFrames by time for comparison - df_read = df_read.sort_values('time').reset_index(drop=True) - df_sorted = convert_to_nullable_types(df.sort_values('time').reset_index(drop=True)) - assert df_read.shape == (100, 4) - assert df_read["time"].equals(df_sorted["time"]) - assert df_read["device"].equals(df_sorted["device"]) - assert df_read["value"].equals(df_sorted["value"]) - assert df_read["value2"].equals(df_sorted["value2"]) - finally: - if os.path.exists(tsfile_path): - os.remove(tsfile_path) - - -def test_write_dataframe_with_index(): - """Test write_dataframe using DataFrame index as time when no 'time' column exists.""" - table = TableSchema("test_table", - [ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG), - ColumnSchema("value", TSDataType.DOUBLE, ColumnCategory.FIELD)]) - tsfile_path = "test_write_dataframe_index.tsfile" - try: - if os.path.exists(tsfile_path): - os.remove(tsfile_path) - - with TsFileTableWriter(tsfile_path, table) as writer: - # Create DataFrame without 'time' column, use index - df = pd.DataFrame({ - 'device': [f"device{i}" for i in range(50)], - 'value': [i * 2.5 for i in range(50)] - }) - df.index = [i * 10 for i in range(50)] # Set index as timestamps - writer.write_dataframe(df) - - # Verify by reading back - df_read = to_dataframe(tsfile_path, table_name="test_table") - # Sort both DataFrames by time for comparison - df_read = df_read.sort_values('time').reset_index(drop=True) - df_sorted = df.sort_index() - df_sorted = convert_to_nullable_types(df_sorted.reset_index(drop=True)) - # Create time series from original index - time_series = pd.Series(df.sort_index().index.values, dtype='Int64') - assert df_read.shape == (50, 3) - # Compare time column with sorted index - assert df_read["time"].equals(time_series) - assert df_read["device"].equals(df_sorted["device"]) - assert df_read["value"].equals(df_sorted["value"]) - finally: - if os.path.exists(tsfile_path): - os.remove(tsfile_path) - - -def test_write_dataframe_case_insensitive(): - """Test write_dataframe with case-insensitive column matching.""" - table = TableSchema("test_table", - [ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG), - ColumnSchema("value", TSDataType.DOUBLE, ColumnCategory.FIELD)]) - tsfile_path = "test_write_dataframe_case.tsfile" - try: - if os.path.exists(tsfile_path): - os.remove(tsfile_path) - - with TsFileTableWriter(tsfile_path, table) as writer: - # DataFrame with different case column names - df = pd.DataFrame({ - 'Time': [i for i in range(30)], # Capital T - 'Device': [f"device{i}" for i in range(30)], # Capital D - 'VALUE': [i * 3.0 for i in range(30)] # All caps - }) - writer.write_dataframe(df) - - # Verify by reading back - df_read = to_dataframe(tsfile_path, table_name="test_table") - # Sort both DataFrames by time for comparison - df_read = df_read.sort_values('time').reset_index(drop=True) - df_sorted = convert_to_nullable_types(df.sort_values('Time').reset_index(drop=True)) - assert df_read.shape == (30, 3) - assert df_read["time"].equals(df_sorted["Time"]) - assert df_read["device"].equals(df_sorted["Device"]) - assert df_read["value"].equals(df_sorted["VALUE"]) - finally: - if os.path.exists(tsfile_path): - os.remove(tsfile_path) - - -def test_write_dataframe_column_not_in_schema(): - """Test write_dataframe raises ColumnNotExistError when DataFrame has extra columns.""" - table = TableSchema("test_table", - [ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG), - ColumnSchema("value", TSDataType.DOUBLE, ColumnCategory.FIELD)]) - tsfile_path = "test_write_dataframe_extra_col.tsfile" - try: - if os.path.exists(tsfile_path): - os.remove(tsfile_path) - - with TsFileTableWriter(tsfile_path, table) as writer: - # DataFrame with extra column not in schema - df = pd.DataFrame({ - 'time': [i for i in range(10)], - 'device': [f"device{i}" for i in range(10)], - 'value': [i * 1.0 for i in range(10)], - 'extra_column': [i for i in range(10)] # Not in schema - }) - with pytest.raises(ColumnNotExistError) as exc_info: - writer.write_dataframe(df) - assert "extra_column" in str(exc_info.value) - finally: - if os.path.exists(tsfile_path): - os.remove(tsfile_path) - - -def test_write_dataframe_type_mismatch(): - """Test write_dataframe raises TypeMismatchError when types are incompatible.""" - table = TableSchema("test_table", - [ColumnSchema("value", TSDataType.STRING, ColumnCategory.FIELD)]) - tsfile_path = "test_write_dataframe_type_mismatch.tsfile" - try: - if os.path.exists(tsfile_path): - os.remove(tsfile_path) - - with TsFileTableWriter(tsfile_path, table) as writer: - # DataFrame with incompatible type (INT64 vs STRING) - df = pd.DataFrame({ - 'time': [i for i in range(10)], - 'value': [i for i in range(10)] # INT64, but schema expects STRING - }) - with pytest.raises(TypeMismatchError) as exc_info: - writer.write_dataframe(df) - assert "Type mismatches" in str(exc_info.value) - finally: - if os.path.exists(tsfile_path): - os.remove(tsfile_path) - + if os.path.exists("tablet_write_and_read.tsfile"): + os.remove("tablet_write_and_read.tsfile") -def test_write_dataframe_all_datatypes(): - """Test write_dataframe with all supported data types.""" - table = TableSchema("test_table", - [ColumnSchema("bool_col", TSDataType.BOOLEAN, ColumnCategory.FIELD), - ColumnSchema("int32_col", TSDataType.INT32, ColumnCategory.FIELD), - ColumnSchema("int64_col", TSDataType.INT64, ColumnCategory.FIELD), - ColumnSchema("float_col", TSDataType.FLOAT, ColumnCategory.FIELD), - ColumnSchema("double_col", TSDataType.DOUBLE, ColumnCategory.FIELD), - ColumnSchema("string_col", TSDataType.STRING, ColumnCategory.FIELD), - ColumnSchema("blob_col", TSDataType.BLOB, ColumnCategory.FIELD)]) - tsfile_path = "test_write_dataframe_all_types.tsfile" - try: - if os.path.exists(tsfile_path): - os.remove(tsfile_path) - - with TsFileTableWriter(tsfile_path, table) as writer: - df = pd.DataFrame({ - 'time': [i for i in range(50)], - 'bool_col': [i % 2 == 0 for i in range(50)], - 'int32_col': pd.Series([i for i in range(50)], dtype='int32'), - 'int64_col': [i * 10 for i in range(50)], - 'float_col': pd.Series([i * 1.5 for i in range(50)], dtype='float32'), - 'double_col': [i * 2.5 for i in range(50)], - 'string_col': [f"str{i}" for i in range(50)], - 'blob_col': [f"blob{i}".encode('utf-8') for i in range(50)] - }) - writer.write_dataframe(df) - - # Verify by reading back - df_read = to_dataframe(tsfile_path, table_name="test_table") - # Sort both DataFrames by time for comparison - df_read = df_read.sort_values('time').reset_index(drop=True) - df_sorted = convert_to_nullable_types(df.sort_values('time').reset_index(drop=True)) - assert df_read.shape == (50, 8) - assert df_read["bool_col"].equals(df_sorted["bool_col"]) - assert df_read["int32_col"].equals(df_sorted["int32_col"]) - assert df_read["int64_col"].equals(df_sorted["int64_col"]) - assert np.allclose(df_read["float_col"], df_sorted["float_col"]) - assert np.allclose(df_read["double_col"], df_sorted["double_col"]) - assert df_read["string_col"].equals(df_sorted["string_col"]) - # BLOB comparison - for i in range(50): - assert df_read["blob_col"].iloc[i] == df_sorted["blob_col"].iloc[i] - finally: - if os.path.exists(tsfile_path): - os.remove(tsfile_path) if __name__ == "__main__": os.chdir(os.path.dirname(os.path.abspath(__file__))) diff --git a/python/tsfile/tsfile_table_writer.py b/python/tsfile/tsfile_table_writer.py index 54f759c9..9a8498ee 100644 --- a/python/tsfile/tsfile_table_writer.py +++ b/python/tsfile/tsfile_table_writer.py @@ -63,11 +63,12 @@ class TsFileTableWriter: Write a pandas DataFrame into table in tsfile. :param dataframe: pandas DataFrame with 'time' column and data columns matching schema. :return: no return value. + :raise: ValueError if dataframe is None or is empty. :raise: ColumnNotExistError if DataFrame columns don't match schema. :raise: TypeMismatchError if DataFrame column types are incompatible with schema. """ if dataframe is None or dataframe.empty: - pass + raise ValueError("DataFrame cannot be None or empty") # Create mapping from lowercase column name to original column name df_column_name_map = {col.lower(): col for col in dataframe.columns if col.lower() != 'time'} diff --git a/python/tsfile/utils.py b/python/tsfile/utils.py index a8ed5601..a19b7e0e 100644 --- a/python/tsfile/utils.py +++ b/python/tsfile/utils.py @@ -221,7 +221,7 @@ def to_tsfile(dataframe: pd.DataFrame, raise ValueError(f"Time column '{time_column}' not found in DataFrame") # Check if time column is integer type (int64 or int) if not is_integer_dtype(dataframe[time_column].dtype): - raise ValueError(f"Time column '{time_column}' must be integer type (int64 or int), got {dataframe[time_column].dtype}") + raise TypeError(f"Time column '{time_column}' must be integer type (int64 or int), got {dataframe[time_column].dtype}") time_col_name = time_column else: # Look for 'time' column (case-insensitive) @@ -231,6 +231,8 @@ def to_tsfile(dataframe: pd.DataFrame, if is_integer_dtype(dataframe[col].dtype): time_col_name = col break + else: + raise TypeError(f"Time column '{col}' must be integer type (int64 or int), got {dataframe[col].dtype}") # Get data columns (excluding time column) data_columns = [col for col in dataframe.columns if col != time_col_name] @@ -252,6 +254,15 @@ def to_tsfile(dataframe: pd.DataFrame, # Infer data type from DataFrame column col_dtype = dataframe[col_name].dtype ts_data_type = TSDataType.from_pandas_datatype(col_dtype) + + # If inferred type is STRING but dtype is object, check actual data to distinguish STRING vs BLOB + if ts_data_type == TSDataType.STRING and (col_dtype == 'object' or str(col_dtype) == "<class 'numpy.object_'>"): + column_series = dataframe[col_name] + first_valid_idx = column_series.first_valid_index() + if first_valid_idx is not None: + first_value = column_series[first_valid_idx] + if isinstance(first_value, bytes): + ts_data_type = TSDataType.BLOB # Determine category (TAG or FIELD) if col_name.lower() in tag_columns_lower:
