jt2594838 commented on code in PR #388: URL: https://github.com/apache/tsfile/pull/388#discussion_r1959166443
########## python/examples/example.py: ########## Review Comment: Add an example using the latest interfaces. ########## python/tsfile/field.py: ########## @@ -0,0 +1,205 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from .constants import TSDataType +from .date_utils import parse_int_to_date +import numpy as np +import pandas as pd + + +class Field(object): + def __init__(self, field_name, data_type, value=None): + """ + :param data_type: TSDataType + """ + self.__data_type = data_type + self.value = value + if not isinstance(value, data_type.to_py_type()): + raise TypeError(f"Expected {data_type.to_py_type()} got {type(value)}") + self.field_name = field_name + + def get_data_type(self): + return self.__data_type + + def get_field_name(self): + return self.field_name + + def is_null(self): + return self.__data_type is None or self.value is None or self.value is pd.NA + + def set_bool_value(self, value: bool): + self.value = value + + def get_bool_value(self): + if ( + self.__data_type != TSDataType.BOOLEAN + or self.value is None + or self.value is pd.NA + ): + return None + return self.value + + def set_int_value(self, value: int): + if isinstance(value, int): + raise TypeError(f"Expected int got {type(value)}") + if not np.iinfo(np.int32).min <= value <= np.iinfo(np.int32).max: + raise OverflowError(f"data:{value} out of range of int32") + self.value = value + + def get_int_value(self): + if ( + self.__data_type != TSDataType.INT32 + and self.__data_type != TSDataType.DATE + or self.value is None + or self.value is pd.NA + ): + return None + return np.int32(self.value) + + def set_long_value(self, value: int): + if isinstance(value, int): + raise TypeError(f"Expected int got {type(value)}") + + if not np.iinfo(np.int64).min <= value <= np.iinfo(np.int64).max: + raise OverflowError(f"data:{value} out of range of int64") + self.value = value + + def get_long_value(self): + if self.__data_type is None: + raise Exception("Null Field Exception!") Review Comment: This check is not added for get_int_value and get_bool_value, why? ########## python/tsfile/schema.py: ########## @@ -0,0 +1,73 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +from typing import List + +from .constants import TSDataType, Category, TSEncoding, Compressor + +class TimeseriesSchema: + timeseries_name = None + data_type = None + encoding_type = None + compression_type = None + + def __init__(self, timeseries_name : str, data_type : TSDataType, encoding_type : TSEncoding = None, compression_type : Compressor = None): + self.timeseries_name = timeseries_name + self.data_type = data_type + self.encoding_type = encoding_type if encoding_type is not None else TSEncoding.PLAIN + self.compression_type = compression_type if compression_type is not None else Compressor.UNCOMPRESSED + +class DeviceSchema: + device_name = None + timeseries_list = None + def __init__(self, device_name : str, timeseries_list : List[TimeseriesSchema]): + self.device_name = device_name + self.timeseries_list = timeseries_list + +class ColumnSchema: + column_name = None + data_type = None + category = None + def __init__(self, column_name : str, data_type : TSDataType, category : Category): + self.column_name = column_name + self.data_type = data_type + self.category = category + + +class TableSchema: + table_name = None + columns = None + def __init__(self, table_name : str, columns : List[ColumnSchema]): + self.table_name = table_name + self.columns = columns + +class ResultSetMetaData: + column_list = None + data_types = None + device_name = None Review Comment: device_name -> table_name ########## python/tsfile/tablet.py: ########## @@ -0,0 +1,153 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +import math +import struct +from enum import unique, IntEnum +from typing import List, Union + +import numpy as np + +from .date_utils import parse_date_to_int +from .constants import TSDataType, Category + + +class Tablet(object): + + def __init__(self, device_id: str, column_name_list: list[str], type_list: list[TSDataType], + max_row_num: int = 1024): + self.timestamp_list = [None for _ in range(max_row_num)] + self.data_list: List[List[Union[int, float, bool, str, bytes, None]]] = [ + [None for _ in range(max_row_num)] for _ in range(len(column_name_list)) + ] + self.device_id = device_id + self.column_name_list = column_name_list + self.type_list = type_list + self.max_row_num = max_row_num + + self._type_ranges = { + TSDataType.INT32: (-2 ** 31, 2 ** 31 - 1), + TSDataType.INT64: (-2 ** 63, 2 ** 63 - 1), + TSDataType.FLOAT: (np.finfo(np.float32).min, np.finfo(np.float32).max), + TSDataType.DOUBLE: (np.finfo(np.float64).min, np.finfo(np.float64).max), + } + + def _check_index(self, col_index : int, row_index : int): + if not (0 <= col_index < len(self.column_name_list)): + raise IndexError(f"column index {col_index} out of range [0, {len(self.column_name_list) - 1}]") + + if not (0 <= row_index < self.max_row_num): + raise IndexError(f"Row index {row_index} out of range [0, {self.max_row_num - 1}]") + + def get_column_name_list(self): + return self.column_name_list + + def get_data_type_list(self): + return self.type_list + + def get_timestamp_list(self): + return self.timestamp_list + + def get_device_id(self): + return self.device_id + + def get_value_list(self): + return self.data_list + + def get_max_row_num(self): + return self.max_row_num + + def add_column(self, column_name: str, column_type: TSDataType): + self.column_name_list.append(column_name) + self.type_list.append(column_type) + + def remove_column(self, column_name: str): + ind = self.column_name_list.index(column_name) + self.column_name_list.remove(column_name) + self.type_list.remove(self.type_list[ind]) + + def set_timestamp_list(self, timestamp_list: list[int]): + self.timestamp_list = timestamp_list + + def add_timestamp(self, row_index: int, timestamp: int): + self.timestamp_list[row_index] = timestamp + + def set_timestamp(self, row_index: int, timestamp: int): + self.timestamp_list[row_index] = timestamp + + def _check_numeric_range(self, value: Union[int, float], data_type: TSDataType): + min_val, max_val = self._type_ranges[data_type] + if math.isinf(value): + raise ValueError( + f"{data_type}.name not support inf" + ) + if not (min_val <= value <= max_val): + raise OverflowError(f"data:{value} out of range ({min_val}, {max_val})") Review Comment: Well, actually you can write INF to TsFile using other languages (and even NaN). So, this check is not precise. ########## python/tsfile/tsfile_cpp.pxd: ########## @@ -0,0 +1,173 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +#cython: language_level=3 +from libc.stdint cimport uint32_t, int32_t, int64_t + +ctypedef int32_t ErrorCode + +# import symbols from tsfile_cwrapper.h +cdef extern from "./tsfile_cwrapper.h": + # common + ctypedef int64_t timestamp + + # reader and writer etc + ctypedef void* TsFileReader + ctypedef void* TsFileWriter + ctypedef void* Tablet + ctypedef void* TsRecord + ctypedef void* ResultSet + + # enum types + ctypedef enum TSDataType: + TS_DATATYPE_BOOLEAN = 0 + TS_DATATYPE_INT32 = 1 + TS_DATATYPE_INT64 = 2 + TS_DATATYPE_FLOAT = 3 + TS_DATATYPE_DOUBLE = 4 + TS_DATATYPE_TEXT = 5 + TS_DATATYPE_VECTOR = 6 + TS_DATATYPE_NULL_TYPE = 254 + TS_DATATYPE_INVALID = 255 + + ctypedef enum TSEncoding: + TS_ENCODING_PLAIN = 0, + TS_ENCODING_DICTIONARY = 1, + TS_ENCODING_RLE = 2, + TS_ENCODING_DIFF = 3, + TS_ENCODING_TS_2DIFF = 4, + TS_ENCODING_BITMAP = 5, + TS_ENCODING_GORILLA_V1 = 6, + TS_ENCODING_REGULAR = 7, + TS_ENCODING_GORILLA = 8, + TS_ENCODING_ZIGZAG = 9, + TS_ENCODING_FREQ = 10, + TS_ENCODING_INVALID = 255 + + ctypedef enum CompressionType: + TS_COMPRESSION_UNCOMPRESSED = 0, + TS_COMPRESSION_SNAPPY = 1, + TS_COMPRESSION_GZIP = 2, + TS_COMPRESSION_LZO = 3, + TS_COMPRESSION_SDT = 4, + TS_COMPRESSION_PAA = 5, + TS_COMPRESSION_PLA = 6, + TS_COMPRESSION_LZ4 = 7, + TS_COMPRESSION_INVALID = 255 + + ctypedef enum ColumnCategory: + TAG = 0, + FIELD = 1 + + # struct types + ctypedef struct ColumnSchema: + char* column_name + TSDataType data_type + ColumnCategory column_category + + ctypedef struct TableSchema: + char* table_name + ColumnSchema* column_schemas + int column_num + + ctypedef struct TimeseriesSchema: + char* timeseries_name + TSDataType data_type + TSEncoding encoding + CompressionType compression + + ctypedef struct DeviceSchema: + char* device_name + TimeseriesSchema* timeseries_schema + int timeseries_num + + ctypedef struct ResultSetMetaData: + char** column_names + TSDataType* data_types + int column_num + + + + # Function Declarations + + # reader:new and close + TsFileReader tsfile_reader_new(const char* pathname, ErrorCode* err_code); + ErrorCode tsfile_reader_close(TsFileReader reader) + + # writer: new and close + TsFileWriter tsfile_writer_new(const char * pathname, ErrorCode* err_code); + ErrorCode tsfile_writer_close(TsFileWriter writer) + + # writer : register table, device and timeseries + ErrorCode tsfile_writer_register_table(TsFileWriter writer, TableSchema* schema); + ErrorCode tsfile_writer_register_timeseries(TsFileWriter writer, const char* device_name, + TimeseriesSchema* schema); + ErrorCode tsfile_writer_register_device(TsFileWriter writer, DeviceSchema* device_schema); + + # writer : write tablet data and flush + ErrorCode tsfile_writer_write_tablet(TsFileWriter writer, Tablet tablet); + ErrorCode tsfile_writer_write_ts_record(TsFileWriter writer, TsRecord record); + + ErrorCode tsfile_writer_flush_data(TsFileWriter writer); + + # tablet : new and add timestamp/value into tablet + Tablet tablet_new_with_device(const char* device_id, const char** column_name_list, TSDataType* data_types, + int column_num, int max_rows); + Tablet tablet_new(const char** column_names, TSDataType* data_types, int column_num); + + ErrorCode tablet_add_timestamp(Tablet tablet, uint32_t row_index, int64_t timestamp); + ErrorCode tablet_add_value_by_index_int64_t(Tablet tablet, uint32_t row_index, uint32_t column_index, int64_t value); + ErrorCode tablet_add_value_by_index_int32_t(Tablet tablet, uint32_t row_index, uint32_t column_index, int32_t value); + ErrorCode tablet_add_value_by_index_double(Tablet tablet, uint32_t row_index, uint32_t column_index, double value); + ErrorCode tablet_add_value_by_index_float(Tablet tablet, uint32_t row_index, uint32_t column_index, float value); + ErrorCode tablet_add_value_by_index_bool(Tablet tablet, uint32_t row_index, uint32_t column_index, bint value); + + void free_tablet(Tablet* tablet); + + # row_record + TsRecord ts_record_new(const char * device_id, int64_t timestamp, int timeseries_num); + ErrorCode insert_data_into_ts_record_by_name_int32_t(TsRecord data, const char *measurement_name, int32_t value); + ErrorCode insert_data_into_ts_record_by_name_int64_t(TsRecord data, const char *measurement_name, int64_t value); + ErrorCode insert_data_into_ts_record_by_name_float(TsRecord data, const char *measurement_name, float value); + ErrorCode insert_data_into_ts_record_by_name_double(TsRecord data, const char *measurement_name, double value); + ErrorCode insert_data_into_ts_record_by_name_bool(TsRecord data, const char *measurement_name, bint value); Review Comment: I guess almost every parameter can be const. ########## python/tsfile/tablet.py: ########## @@ -0,0 +1,153 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +import math +import struct +from enum import unique, IntEnum +from typing import List, Union + +import numpy as np + +from .date_utils import parse_date_to_int +from .constants import TSDataType, Category + + +class Tablet(object): + + def __init__(self, device_id: str, column_name_list: list[str], type_list: list[TSDataType], + max_row_num: int = 1024): + self.timestamp_list = [None for _ in range(max_row_num)] + self.data_list: List[List[Union[int, float, bool, str, bytes, None]]] = [ + [None for _ in range(max_row_num)] for _ in range(len(column_name_list)) + ] + self.device_id = device_id + self.column_name_list = column_name_list + self.type_list = type_list + self.max_row_num = max_row_num + + self._type_ranges = { + TSDataType.INT32: (-2 ** 31, 2 ** 31 - 1), + TSDataType.INT64: (-2 ** 63, 2 ** 63 - 1), + TSDataType.FLOAT: (np.finfo(np.float32).min, np.finfo(np.float32).max), + TSDataType.DOUBLE: (np.finfo(np.float64).min, np.finfo(np.float64).max), + } + + def _check_index(self, col_index : int, row_index : int): + if not (0 <= col_index < len(self.column_name_list)): + raise IndexError(f"column index {col_index} out of range [0, {len(self.column_name_list) - 1}]") + + if not (0 <= row_index < self.max_row_num): + raise IndexError(f"Row index {row_index} out of range [0, {self.max_row_num - 1}]") + + def get_column_name_list(self): + return self.column_name_list + + def get_data_type_list(self): + return self.type_list + + def get_timestamp_list(self): + return self.timestamp_list + + def get_device_id(self): + return self.device_id + + def get_value_list(self): + return self.data_list + + def get_max_row_num(self): + return self.max_row_num + + def add_column(self, column_name: str, column_type: TSDataType): + self.column_name_list.append(column_name) + self.type_list.append(column_type) + + def remove_column(self, column_name: str): + ind = self.column_name_list.index(column_name) + self.column_name_list.remove(column_name) + self.type_list.remove(self.type_list[ind]) + + def set_timestamp_list(self, timestamp_list: list[int]): + self.timestamp_list = timestamp_list + + def add_timestamp(self, row_index: int, timestamp: int): + self.timestamp_list[row_index] = timestamp + + def set_timestamp(self, row_index: int, timestamp: int): + self.timestamp_list[row_index] = timestamp + + def _check_numeric_range(self, value: Union[int, float], data_type: TSDataType): + min_val, max_val = self._type_ranges[data_type] + if math.isinf(value): + raise ValueError( + f"{data_type}.name not support inf" + ) + if not (min_val <= value <= max_val): + raise OverflowError(f"data:{value} out of range ({min_val}, {max_val})") + + def add_value_by_name(self, column_name: str, row_index: int, value: Union[int, float, bool, str, bytes]): + try: + col_index = self.column_name_list.index(column_name) + except ValueError: + raise ValueError(f"Column '{column_name}' does not exist") from None Review Comment: ValueError does not seem very precise for this situation. ########## python/tsfile/tsfile_py_cpp.pyx: ########## @@ -0,0 +1,384 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +from cpython.exc cimport PyErr_SetObject +from cpython.ref cimport PyObject +from tsfile.exceptions import ( + LibraryError, + OOMError, + AlreadyExistsError, + InvalidQueryError, + get_exception, + ERROR_MAPPING + ) + +#cython: language_level=3 +from .tsfile_cpp cimport * + +from libc.stdlib cimport free +from libc.stdlib cimport malloc +from libc.string cimport strdup + +from cpython.object cimport PyObject +from cpython.bytes cimport PyBytes_AsString +from cpython.unicode cimport PyUnicode_AsUTF8String, PyUnicode_AsUTF8 + +from tsfile.schema import ResultSetMetaData as ResultSetMetaDataPy +from tsfile.schema import TSDataType as TSDataTypePy, TSEncoding as TSEncodingPy +from tsfile.schema import Compressor as CompressorPy, Category as CategoryPy + +cdef inline void check_error(int errcode, const char* context=NULL) except *: + cdef: + object exc_type + object exc_instance + + if errcode == 0: + return + + exc_type = ERROR_MAPPING.get(errcode) + exc_instance = exc_type(errcode, "") + PyErr_SetObject(exc_type, exc_instance) + +# convert from c to python +cdef object from_c_result_set_meta_data(ResultSetMetaData schema): + column_list = [] + data_types = [] + column_num = schema.column_num + + for i in range(column_num): + column_list.append(schema.column_names[i].decode('utf-8')) + data_types.append(TSDataTypePy(schema.data_types[i])) + result = ResultSetMetaDataPy(column_list, data_types) + return result + +# Convert from python to c struct +cdef dict TS_DATA_TYPE_MAP = { + TSDataTypePy.BOOLEAN: TSDataType.TS_DATATYPE_BOOLEAN, + TSDataTypePy.INT32: TSDataType.TS_DATATYPE_INT32, + TSDataTypePy.INT64: TSDataType.TS_DATATYPE_INT64, + TSDataTypePy.FLOAT: TSDataType.TS_DATATYPE_FLOAT, + TSDataTypePy.DOUBLE: TSDataType.TS_DATATYPE_DOUBLE, + TSDataTypePy.TEXT: TSDataType.TS_DATATYPE_TEXT, +} + +cdef dict TS_ENCODING_MAP = { + TSEncodingPy.PLAIN: TSEncoding.TS_ENCODING_PLAIN, + TSEncodingPy.DICTIONARY: TSEncoding.TS_ENCODING_DICTIONARY, + TSEncodingPy.RLE: TSEncoding.TS_ENCODING_RLE, + TSEncodingPy.DIFF: TSEncoding.TS_ENCODING_DIFF, + TSEncodingPy.TS_2DIFF: TSEncoding.TS_ENCODING_TS_2DIFF, + TSEncodingPy.BITMAP: TSEncoding.TS_ENCODING_BITMAP, + TSEncodingPy.GORILLA_V1: TSEncoding.TS_ENCODING_GORILLA_V1, + TSEncodingPy.REGULAR: TSEncoding.TS_ENCODING_REGULAR, + TSEncodingPy.GORILLA: TSEncoding.TS_ENCODING_GORILLA, + TSEncodingPy.ZIGZAG: TSEncoding.TS_ENCODING_ZIGZAG, +} + +cdef dict COMPRESSION_TYPE_MAP = { + CompressorPy.UNCOMPRESSED: CompressionType.TS_COMPRESSION_UNCOMPRESSED, + CompressorPy.SNAPPY: CompressionType.TS_COMPRESSION_SNAPPY, + CompressorPy.GZIP: CompressionType.TS_COMPRESSION_GZIP, + CompressorPy.LZO: CompressionType.TS_COMPRESSION_LZO, + CompressorPy.SDT: CompressionType.TS_COMPRESSION_SDT, + CompressorPy.PAA: CompressionType.TS_COMPRESSION_PAA, + CompressorPy.PLA: CompressionType.TS_COMPRESSION_PLA, + CompressorPy.LZ4: CompressionType.TS_COMPRESSION_LZ4, +} + +cdef TSDataType to_c_data_type(object data_type): + try: + return TS_DATA_TYPE_MAP[data_type] + except KeyError: + raise ValueError(f"Unsupported Python TSDataType: {data_type}") + +cdef TSEncoding to_c_encoding_type(object encoding_type): + try: + return TS_ENCODING_MAP[encoding_type] + except KeyError: + raise ValueError(f"Unsupported Python TSEncoding: {encoding_type}") + +cdef CompressionType to_c_compression_type(object compression_type): + try: + return COMPRESSION_TYPE_MAP[compression_type] + except KeyError: + raise ValueError(f"Unsupported Python Compressor: {compression_type}") + +cdef ColumnCategory to_c_category(object category): + if category == CategoryPy.TAG: + return <ColumnCategory> ColumnCategory.TAG + if category == CategoryPy.FIELD: + return <ColumnCategory> ColumnCategory.FIELD + +cdef TimeseriesSchema* to_c_timeseries_schema(object py_schema): + cdef TimeseriesSchema* c_schema + c_schema = <TimeseriesSchema *> malloc(sizeof(TimeseriesSchema)) + c_schema.timeseries_name = strdup(py_schema.timeseries_name.encode('utf-8')) + if py_schema.data_type is not None: + c_schema.data_type = to_c_data_type(py_schema.data_type) + else: + raise ValueError("data_type cannot be None") + if py_schema.encoding_type is not None: + c_schema.encoding = to_c_encoding_type(py_schema.encoding_type) + else: + raise ValueError("encoding_type cannot be None") + if py_schema.compression_type is not None: + c_schema.compression = to_c_compression_type(py_schema.compression_type) + else: + raise ValueError("compression_type cannot be None") + return c_schema + + + +cdef DeviceSchema* to_c_device_schema(object py_schema): + cdef DeviceSchema* c_schema + c_schema = <DeviceSchema *> malloc(sizeof(DeviceSchema)) + c_schema.device_name = strdup(py_schema.device_name.encode('utf-8')) + c_schema.timeseries_num = len(py_schema.timeseries_list) + c_schema.timeseries_schema = <TimeseriesSchema *> malloc(c_schema.timeseries_num * sizeof(TimeseriesSchema)) + for i in range(c_schema.timeseries_num): + c_schema.timeseries_schema[i].timeseries_name = strdup(py_schema.timeseries_list[i].timeseries_name.encode('utf-8')) + c_schema.timeseries_schema[i].data_type = to_c_data_type(py_schema.timeseries_list[i].data_type) + c_schema.timeseries_schema[i].encoding = to_c_encoding_type(py_schema.timeseries_list[i].encoding_type) + c_schema.timeseries_schema[i].compression = to_c_compression_type(py_schema.timeseries_list[i].compression_type) + return c_schema + +cdef ColumnSchema* to_c_column_schema(object py_schema): + cdef ColumnSchema* c_schema + c_schema = <ColumnSchema*> malloc(sizeof(ColumnSchema)) + c_schema.data_type = to_c_data_type(py_schema.data_type) + c_schema.column_category = py_schema.category + c_schema.column_name = strdup(py_schema.column_name.encode('utf-8')) + return c_schema + +cdef TableSchema* to_c_table_schema(object py_schema): + cdef TableSchema* c_schema + c_schema = <TableSchema *> malloc(sizeof(TableSchema)) + c_schema.table_name = strdup(py_schema.table_name.encode('utf-8')) + c_schema.column_num = len(py_schema.columns) + c_schema.column_schemas = <ColumnSchema *> malloc(c_schema.column_num * sizeof(ColumnSchema)) + for i in range(c_schema.column_num): + c_schema.column_schemas[i].column_name = strdup(py_schema.columns[i].column_name) + c_schema.column_schemas[i].column_category = to_c_category(py_schema.columns[i].category) + c_schema.column_schemas[i].data_type = to_c_data_type(py_schema.columns[i].data_type) + return c_schema + + +cdef Tablet to_c_tablet(object tablet): + cdef Tablet ctablet + cdef int max_row_num + cdef TSDataType data_type + cdef int64_t timestamp + cdef bytes device_id_bytes = PyUnicode_AsUTF8String(tablet.get_device_id()) + cdef const char * device_id_c = device_id_bytes + + + column_num = len(tablet.get_column_name_list()) + columns_names = <char**> malloc(sizeof(char *) * column_num) + columns_types = <TSDataType *> malloc(sizeof(TSDataType) * column_num) + for i in range(column_num): + columns_names[i] = strdup(tablet.get_column_name_list()[i].encode('utf-8')) + columns_types[i] = to_c_data_type(tablet.get_data_type_list()[i]) + + max_row_num = tablet.get_max_row_num(); + + ctablet = tablet_new_with_device(device_id_c, columns_names, columns_types, column_num, + max_row_num) + free(columns_types) + for i in range(column_num): + free(columns_names[i]) + free(columns_names) + + for row in range(max_row_num): + timestamp_py = tablet.get_timestamp_list()[row] + if timestamp_py is None: + continue + timestamp = timestamp_py + tablet_add_timestamp(ctablet, row, timestamp) + + for col in range(column_num): + data_type = to_c_data_type(tablet.get_data_type_list()[col]) + value = tablet.get_value_list()[col] + # BOOLEAN + if data_type == TS_DATATYPE_BOOLEAN: + for row in range(max_row_num): + if value[row] is not None: + tablet_add_value_by_index_bool(ctablet, row, col, value[row]) + # INT32 + elif data_type == TS_DATATYPE_INT32: + for row in range(max_row_num): + if value[row] is not None: + tablet_add_value_by_index_int32_t(ctablet, row, col, value[row]) + + # INT64 + elif data_type == TS_DATATYPE_INT64: + for row in range(max_row_num): + if value[row] is not None: + tablet_add_value_by_index_int64_t(ctablet, row, col, value[row]) + # FLOAT + elif data_type == TS_DATATYPE_FLOAT: + for row in range(max_row_num): + if value[row] is not None: + tablet_add_value_by_index_float(ctablet, row, col, value[row]) + + # DOUBLE + elif data_type == TS_DATATYPE_DOUBLE: + for row in range(max_row_num): + if value[row] is not None: + tablet_add_value_by_index_double(ctablet, row, col, value[row]) + + return ctablet + + +cdef TsRecord to_c_record(object row_record): + cdef int field_num = row_record.get_fields_num() + cdef int64_t timestamp = <int64_t>row_record.get_timestamp() + cdef bytes device_id_bytes = PyUnicode_AsUTF8String(row_record.get_device_id()) + cdef const char* device_id = device_id_bytes + cdef TsRecord record + cdef int i + cdef TSDataType data_type + record = ts_record_new(device_id, timestamp, field_num) + for i in range(field_num): + field = row_record.get_fields()[i] + data_type = to_c_data_type(field.get_data_type()) + if data_type == TS_DATATYPE_BOOLEAN: + insert_data_into_ts_record_by_name_bool(record, PyUnicode_AsUTF8(field.get_field_name()), field.get_bool_value()) + elif data_type == TS_DATATYPE_INT32: + insert_data_into_ts_record_by_name_int32_t(record, PyUnicode_AsUTF8(field.get_field_name()), field.get_int_value()) + elif data_type == TS_DATATYPE_INT64: + insert_data_into_ts_record_by_name_int64_t(record, PyUnicode_AsUTF8(field.get_field_name()), field.get_long_value()) + elif data_type == TS_DATATYPE_DOUBLE: + insert_data_into_ts_record_by_name_double(record, PyUnicode_AsUTF8(field.get_field_name()), field.get_double_value()) + elif data_type == TS_DATATYPE_FLOAT: + insert_data_into_ts_record_by_name_float(record, PyUnicode_AsUTF8(field.get_field_name()), field.get_float_value()) + + return record + +# Free c structs' space +cdef void free_c_table_schema(TableSchema* c_schema): + free(c_schema.table_name) + for i in range(c_schema.column_num): + free_c_column_schema(&(c_schema.column_schemas[i])) + free(c_schema.column_schemas) + +cdef void free_c_column_schema(ColumnSchema* c_schema): + free(c_schema.column_name) + +cdef void free_c_timeseries_schema(TimeseriesSchema* c_schema): + free(c_schema.timeseries_name) + +cdef void free_c_device_schema(DeviceSchema* c_schema): + free(c_schema.device_name) + for i in range(c_schema.timeseries_num): + free_c_timeseries_schema(&(c_schema.timeseries_schema[i])) + free(c_schema.timeseries_schema) + +cdef void free_c_tablet(Tablet tablet): + free_tablet(&tablet) + +cdef void free_c_row_record(TsRecord record): + free_tsfile_ts_record(&record) + +# Reader and writer new. +cdef TsFileWriter tsfile_writer_new_c(object pathname) except +: + cdef ErrorCode errno = 0 + cdef TsFileWriter writer + cdef bytes encoded_path = PyUnicode_AsUTF8String(pathname) + cdef const char* c_path = encoded_path + writer = tsfile_writer_new(c_path, &errno) + check_error(errno) + return writer + +cdef TsFileReader tsfile_reader_new_c(object pathname) except +: + cdef ErrorCode errno = 0 + cdef TsFileReader reader + cdef bytes encoded_path = PyUnicode_AsUTF8String(pathname) + cdef const char* c_path = encoded_path + reader = tsfile_reader_new(c_path, &errno) + check_error(errno) + return reader + +# Register table and device +cdef ErrorCode tsfile_writer_register_device_py_cpp(TsFileWriter writer, DeviceSchema *schema): + cdef ErrorCode errno + errno = tsfile_writer_register_device(writer, schema) + return errno + +cdef ErrorCode tsfile_writer_register_timeseries_py_cpp(TsFileWriter writer, object device_name, + TimeseriesSchema *schema): + cdef ErrorCode errno + cdef bytes encoded_device_name = PyUnicode_AsUTF8String(device_name) + cdef const char* c_device_name = encoded_device_name + errno = tsfile_writer_register_timeseries(writer, c_device_name, schema) + return errno + +cdef ErrorCode tsfile_writer_register_table_py_cpp(TsFileWriter writer, TableSchema *schema): + cdef ErrorCode errno + errno = tsfile_writer_register_table(writer, schema) + return errno + +cdef bint tsfile_result_set_is_null_by_name_c(ResultSet result_set, object name): + cdef bytes encoded_name = PyUnicode_AsUTF8String(name) + cdef const char* c_name = encoded_name + return tsfile_result_set_is_null_by_name(result_set, c_name) + +cdef ResultSet tsfile_reader_query_table_c(TsFileReader reader, object table_name, object column_list, + int64_t start_time, int64_t end_time): + cdef ResultSet result + cdef int column_num = len(column_list) + cdef bytes table_name_bytes = PyUnicode_AsUTF8String(table_name) + cdef const char* table_name_c = table_name_bytes + cdef char** columns = <char**> malloc(sizeof(char*) * column_num) + cdef int i + if columns == NULL: + raise MemoryError("Failed to allocate memory for columns") + try: + for i in range(column_num): + columns[i] = strdup((<str>column_list[i]).encode('utf-8')) + if columns[i] == NULL: + raise MemoryError("Failed to allocate memory for column name") + result = tsfile_reader_query_table(reader, table_name_c, columns, column_num, start_time, end_time) + return result + except Exception as e: + for i in range(column_num): + free(<void*>columns[i]) + free(<void*>columns) + raise e Review Comment: Columns are not freed if no exception? ########## python/tests/test_write.py: ########## Review Comment: Add tests using the latest interfaces. ########## python/tsfile/constants.py: ########## @@ -0,0 +1,81 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from enum import unique, IntEnum + +@unique +class TSDataType(IntEnum): + BOOLEAN = 0 + INT32 = 1 + INT64 = 2 + FLOAT = 3 + DOUBLE = 4 + TEXT = 5 + TIMESTAMP = 8 + DATE = 9 + BLOB = 10 + STRING = 11 + + def to_py_type(self): + if self == TSDataType.BOOLEAN: + return bool + elif self == TSDataType.INT32: + return int + elif self == TSDataType.INT64: + return int + elif self == TSDataType.FLOAT: + return float + elif self == TSDataType.DOUBLE: + return float + elif self == TSDataType.TEXT: + return str Review Comment: STRING -> str ########## python/tsfile/exceptions.py: ########## @@ -0,0 +1,144 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +class LibraryError(Exception): + _default_message = "Unknown error occurred" + _default_code = -1 + + def __init__(self, code=None, context=None): + self.code = code if code is not None else self._default_code + self.message = context if context is not None else self._default_message + super().__init__(f"[{code}] {self.message}") + def __str__(self): + return f"{self.code}: {self.message}" + +class OOMError(LibraryError): + _default_message = "Out of memory" + _default_code = 1 + +class NotExistsError(LibraryError): + _default_message = "Requested resource does not exist" + _default_code = 2 + +class AlreadyExistsError(LibraryError): + _default_message = "Resource already exists" + _default_code = 3 + +class InvalidArgumentError(LibraryError): + _default_message = "Invalid argument provided" + _default_code = 4 + +class OutOfRangeError(LibraryError): + _default_message = "Value out of valid range" + _default_code = 5 + +class PartialReadError(LibraryError): + _default_message = "Incomplete data read operation" + _default_code = 6 + +class FileOpenError(LibraryError): + _default_message = "Failed to open file" + _default_code = 28 + +class FileCloseError(LibraryError): + _default_message = "Failed to close file" + _default_code = 29 + +class FileWriteError(LibraryError): + _default_message = "Failed to write to file" + _default_code = 30 + +class FileReadError(LibraryError): + _default_message = "Failed to read from file" + _default_code = 31 + +class FileSyncError(LibraryError): + _default_message = "Failed to sync file contents" + _default_code = 32 + +class MetadataError(LibraryError): + _default_message = "Metadata inconsistency detected" + _default_code = 33 + +class BufferNotEnoughError(LibraryError): + _default_message = "Insufficient buffer space" + _default_code = 36 + +class DeviceNotExistError(LibraryError): + _default_message = "Requested device does not exist" + _default_code = 44 + +class MeasurementNotExistError(LibraryError): + _default_message = "Specified measurement does not exist" + _default_code = 45 + +class InvalidQueryError(LibraryError): + _default_message = "Malformed query syntax" + _default_code = 46 + +class CompressionError(LibraryError): + _default_message = "Data compression/decompression failed" + _default_code = 48 + +class TableNotExistError(LibraryError): + _default_message = "Requested table does not exist" + _default_code = 49 + + +class TypeNotSupportedError(LibraryError): + _default_message = "Unsupported data type" + _default_code = 26 + +class TypeMismatchError(LibraryError): + _default_message = "Data type mismatch" + _default_code = 27 + +ERROR_MAPPING = { + 1: OOMError, + 2: NotExistsError, + 3: AlreadyExistsError, + 4: InvalidArgumentError, + 5: OutOfRangeError, + 6: PartialReadError, + 26: TypeNotSupportedError, + 27: TypeMismatchError, + 28: FileOpenError, + 29: FileCloseError, + 30: FileWriteError, + 31: FileReadError, + 32: FileSyncError, + 33: MetadataError, + 36: BufferNotEnoughError, + 44: DeviceNotExistError, + 45: MeasurementNotExistError, + 46: InvalidQueryError, + 48: CompressionError, + 49: TableNotExistError, +} + +def get_exception(code : int, context : str = None): + if code == 0: + return None + + exc_type = ERROR_MAPPING.get(code) + if not exc_type: + return LibraryError(code=code, message=f"Unmapped error code: {code}") Review Comment: Also include `context`. ########## python/tsfile/field.py: ########## @@ -0,0 +1,205 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from .constants import TSDataType +from .date_utils import parse_int_to_date +import numpy as np +import pandas as pd + + +class Field(object): + def __init__(self, field_name, data_type, value=None): + """ + :param data_type: TSDataType + """ + self.__data_type = data_type + self.value = value + if not isinstance(value, data_type.to_py_type()): + raise TypeError(f"Expected {data_type.to_py_type()} got {type(value)}") + self.field_name = field_name + + def get_data_type(self): + return self.__data_type + + def get_field_name(self): + return self.field_name + + def is_null(self): + return self.__data_type is None or self.value is None or self.value is pd.NA + + def set_bool_value(self, value: bool): + self.value = value + + def get_bool_value(self): + if ( + self.__data_type != TSDataType.BOOLEAN + or self.value is None + or self.value is pd.NA + ): + return None + return self.value + + def set_int_value(self, value: int): + if isinstance(value, int): + raise TypeError(f"Expected int got {type(value)}") Review Comment: Should be `not isinstance`? The same below. ########## python/tsfile/tsfile_reader.pyx: ########## @@ -0,0 +1,148 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +#cython: language_level=3 + +from .tsfile_cpp cimport * +from .tsfile_py_cpp cimport * + +from tsfile.schema import TSDataType as TSDataTypePy +from cpython.bytes cimport PyBytes_AsString +from cpython.unicode cimport PyUnicode_AsUTF8String +from typing import List + + +cdef class ResultSetPy: + """ + Get data from a query result. + """ + cdef ResultSet result + cdef ResultSetMetaData metadata_c + cdef public object metadata Review Comment: Is it necessary to keep both? Can metadata_c be freed right after metadata is created? ########## python/tsfile/field.py: ########## @@ -0,0 +1,205 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from .constants import TSDataType +from .date_utils import parse_int_to_date +import numpy as np +import pandas as pd + + +class Field(object): + def __init__(self, field_name, data_type, value=None): + """ + :param data_type: TSDataType + """ + self.__data_type = data_type + self.value = value + if not isinstance(value, data_type.to_py_type()): + raise TypeError(f"Expected {data_type.to_py_type()} got {type(value)}") + self.field_name = field_name + + def get_data_type(self): + return self.__data_type + + def get_field_name(self): + return self.field_name + + def is_null(self): + return self.__data_type is None or self.value is None or self.value is pd.NA + + def set_bool_value(self, value: bool): + self.value = value + + def get_bool_value(self): + if ( + self.__data_type != TSDataType.BOOLEAN + or self.value is None + or self.value is pd.NA + ): + return None + return self.value + + def set_int_value(self, value: int): + if isinstance(value, int): + raise TypeError(f"Expected int got {type(value)}") + if not np.iinfo(np.int32).min <= value <= np.iinfo(np.int32).max: + raise OverflowError(f"data:{value} out of range of int32") + self.value = value + + def get_int_value(self): + if ( + self.__data_type != TSDataType.INT32 + and self.__data_type != TSDataType.DATE + or self.value is None + or self.value is pd.NA + ): + return None + return np.int32(self.value) + + def set_long_value(self, value: int): + if isinstance(value, int): + raise TypeError(f"Expected int got {type(value)}") + + if not np.iinfo(np.int64).min <= value <= np.iinfo(np.int64).max: + raise OverflowError(f"data:{value} out of range of int64") + self.value = value + + def get_long_value(self): + if self.__data_type is None: + raise Exception("Null Field Exception!") + if ( + self.__data_type != TSDataType.INT64 + and self.__data_type != TSDataType.TIMESTAMP + or self.value is None + or self.value is pd.NA Review Comment: Maybe you can support get_long_value for INT32. ########## python/tsfile/date_utils.py: ########## @@ -0,0 +1,41 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from datetime import date + + +class DateTimeParseException(Exception): + pass + + +def parse_int_to_date(date_int: int) -> date: + try: + year = date_int // 10000 + month = (date_int // 100) % 100 + day = date_int % 100 + return date(year, month, day) + except ValueError as e: + raise DateTimeParseException("Invalid date format.") from e + + +def parse_date_to_int(local_date: date) -> int: + if local_date is None: + raise DateTimeParseException("Date expression is none or empty.") + if local_date.year < 1000: + raise DateTimeParseException("Year must be between 1000 and 9999.") Review Comment: || local_date.year > 9999 ########## python/tsfile/field.py: ########## @@ -0,0 +1,205 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from .constants import TSDataType +from .date_utils import parse_int_to_date +import numpy as np +import pandas as pd + + +class Field(object): + def __init__(self, field_name, data_type, value=None): + """ + :param data_type: TSDataType + """ + self.__data_type = data_type + self.value = value + if not isinstance(value, data_type.to_py_type()): + raise TypeError(f"Expected {data_type.to_py_type()} got {type(value)}") + self.field_name = field_name + + def get_data_type(self): + return self.__data_type + + def get_field_name(self): + return self.field_name + + def is_null(self): + return self.__data_type is None or self.value is None or self.value is pd.NA + + def set_bool_value(self, value: bool): + self.value = value + + def get_bool_value(self): + if ( + self.__data_type != TSDataType.BOOLEAN + or self.value is None + or self.value is pd.NA + ): + return None + return self.value + + def set_int_value(self, value: int): + if isinstance(value, int): + raise TypeError(f"Expected int got {type(value)}") + if not np.iinfo(np.int32).min <= value <= np.iinfo(np.int32).max: + raise OverflowError(f"data:{value} out of range of int32") + self.value = value + + def get_int_value(self): + if ( + self.__data_type != TSDataType.INT32 + and self.__data_type != TSDataType.DATE + or self.value is None + or self.value is pd.NA + ): + return None + return np.int32(self.value) + + def set_long_value(self, value: int): + if isinstance(value, int): + raise TypeError(f"Expected int got {type(value)}") + + if not np.iinfo(np.int64).min <= value <= np.iinfo(np.int64).max: + raise OverflowError(f"data:{value} out of range of int64") + self.value = value + + def get_long_value(self): + if self.__data_type is None: + raise Exception("Null Field Exception!") + if ( + self.__data_type != TSDataType.INT64 + and self.__data_type != TSDataType.TIMESTAMP + or self.value is None + or self.value is pd.NA + ): + return None + return np.int64(self.value) + + def set_float_value(self, value: float): + if isinstance(value, float): + raise TypeError(f"Expected float got {type(value)}") + if not np.finfo(np.float32).min <= value <= np.finfo(np.float32).max: + raise OverflowError(f"data:{value} out of range of float32") + self.value = value + + def get_float_value(self): + if self.__data_type is None: + raise Exception("Null Field Exception!") Review Comment: Create a specific Exception class for this? ########## python/tsfile/field.py: ########## @@ -0,0 +1,205 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from .constants import TSDataType +from .date_utils import parse_int_to_date +import numpy as np +import pandas as pd + + +class Field(object): + def __init__(self, field_name, data_type, value=None): + """ + :param data_type: TSDataType + """ + self.__data_type = data_type + self.value = value + if not isinstance(value, data_type.to_py_type()): + raise TypeError(f"Expected {data_type.to_py_type()} got {type(value)}") + self.field_name = field_name + + def get_data_type(self): + return self.__data_type + + def get_field_name(self): + return self.field_name + + def is_null(self): + return self.__data_type is None or self.value is None or self.value is pd.NA + + def set_bool_value(self, value: bool): + self.value = value + + def get_bool_value(self): + if ( + self.__data_type != TSDataType.BOOLEAN + or self.value is None + or self.value is pd.NA + ): + return None + return self.value + + def set_int_value(self, value: int): + if isinstance(value, int): + raise TypeError(f"Expected int got {type(value)}") + if not np.iinfo(np.int32).min <= value <= np.iinfo(np.int32).max: + raise OverflowError(f"data:{value} out of range of int32") + self.value = value + + def get_int_value(self): + if ( + self.__data_type != TSDataType.INT32 + and self.__data_type != TSDataType.DATE + or self.value is None + or self.value is pd.NA + ): + return None + return np.int32(self.value) + + def set_long_value(self, value: int): + if isinstance(value, int): + raise TypeError(f"Expected int got {type(value)}") + + if not np.iinfo(np.int64).min <= value <= np.iinfo(np.int64).max: + raise OverflowError(f"data:{value} out of range of int64") + self.value = value + + def get_long_value(self): + if self.__data_type is None: + raise Exception("Null Field Exception!") + if ( + self.__data_type != TSDataType.INT64 + and self.__data_type != TSDataType.TIMESTAMP + or self.value is None + or self.value is pd.NA + ): + return None + return np.int64(self.value) + + def set_float_value(self, value: float): + if isinstance(value, float): + raise TypeError(f"Expected float got {type(value)}") + if not np.finfo(np.float32).min <= value <= np.finfo(np.float32).max: + raise OverflowError(f"data:{value} out of range of float32") + self.value = value + + def get_float_value(self): + if self.__data_type is None: + raise Exception("Null Field Exception!") + if ( + self.__data_type != TSDataType.FLOAT + or self.value is None + or self.value is pd.NA + ): + return None + return np.float32(self.value) + + def set_double_value(self, value: float): + if isinstance(value, float): + raise TypeError(f"Expected float got {type(value)}") + if not np.finfo(np.float64).min <= value <= np.finfo(np.float64).max: + raise OverflowError(f"data:{value} out of range of float64") + self.value = value + + def get_double_value(self): + if self.__data_type is None: + raise Exception("Null Field Exception!") + if ( + self.__data_type != TSDataType.DOUBLE + or self.value is None + or self.value is pd.NA Review Comment: Maybe we can support INT32, INT64, and FLOAT here. ########## python/tsfile/schema.py: ########## @@ -0,0 +1,73 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +from typing import List + +from .constants import TSDataType, Category, TSEncoding, Compressor + +class TimeseriesSchema: + timeseries_name = None + data_type = None + encoding_type = None + compression_type = None + + def __init__(self, timeseries_name : str, data_type : TSDataType, encoding_type : TSEncoding = None, compression_type : Compressor = None): + self.timeseries_name = timeseries_name + self.data_type = data_type + self.encoding_type = encoding_type if encoding_type is not None else TSEncoding.PLAIN + self.compression_type = compression_type if compression_type is not None else Compressor.UNCOMPRESSED Review Comment: How about just, encoding_type : TSEncoding = TSEncoding.PLAIN ########## python/tsfile/field.py: ########## @@ -0,0 +1,205 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from .constants import TSDataType +from .date_utils import parse_int_to_date +import numpy as np +import pandas as pd + + +class Field(object): + def __init__(self, field_name, data_type, value=None): + """ + :param data_type: TSDataType + """ + self.__data_type = data_type + self.value = value + if not isinstance(value, data_type.to_py_type()): + raise TypeError(f"Expected {data_type.to_py_type()} got {type(value)}") + self.field_name = field_name + + def get_data_type(self): + return self.__data_type + + def get_field_name(self): + return self.field_name + + def is_null(self): + return self.__data_type is None or self.value is None or self.value is pd.NA + + def set_bool_value(self, value: bool): + self.value = value + + def get_bool_value(self): + if ( + self.__data_type != TSDataType.BOOLEAN + or self.value is None + or self.value is pd.NA + ): + return None + return self.value + + def set_int_value(self, value: int): + if isinstance(value, int): + raise TypeError(f"Expected int got {type(value)}") + if not np.iinfo(np.int32).min <= value <= np.iinfo(np.int32).max: + raise OverflowError(f"data:{value} out of range of int32") + self.value = value + + def get_int_value(self): + if ( + self.__data_type != TSDataType.INT32 + and self.__data_type != TSDataType.DATE + or self.value is None + or self.value is pd.NA + ): + return None + return np.int32(self.value) + + def set_long_value(self, value: int): + if isinstance(value, int): + raise TypeError(f"Expected int got {type(value)}") + + if not np.iinfo(np.int64).min <= value <= np.iinfo(np.int64).max: + raise OverflowError(f"data:{value} out of range of int64") + self.value = value + + def get_long_value(self): + if self.__data_type is None: + raise Exception("Null Field Exception!") + if ( + self.__data_type != TSDataType.INT64 + and self.__data_type != TSDataType.TIMESTAMP + or self.value is None + or self.value is pd.NA + ): + return None + return np.int64(self.value) + + def set_float_value(self, value: float): + if isinstance(value, float): + raise TypeError(f"Expected float got {type(value)}") + if not np.finfo(np.float32).min <= value <= np.finfo(np.float32).max: + raise OverflowError(f"data:{value} out of range of float32") + self.value = value + + def get_float_value(self): + if self.__data_type is None: + raise Exception("Null Field Exception!") + if ( + self.__data_type != TSDataType.FLOAT + or self.value is None + or self.value is pd.NA Review Comment: Maybe we can support INT32 here. ########## python/tsfile/field.py: ########## @@ -0,0 +1,205 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from .constants import TSDataType +from .date_utils import parse_int_to_date +import numpy as np +import pandas as pd + + +class Field(object): + def __init__(self, field_name, data_type, value=None): + """ + :param data_type: TSDataType + """ + self.__data_type = data_type + self.value = value + if not isinstance(value, data_type.to_py_type()): + raise TypeError(f"Expected {data_type.to_py_type()} got {type(value)}") + self.field_name = field_name + + def get_data_type(self): + return self.__data_type + + def get_field_name(self): + return self.field_name + + def is_null(self): + return self.__data_type is None or self.value is None or self.value is pd.NA + + def set_bool_value(self, value: bool): + self.value = value + + def get_bool_value(self): + if ( + self.__data_type != TSDataType.BOOLEAN + or self.value is None + or self.value is pd.NA + ): + return None + return self.value + + def set_int_value(self, value: int): + if isinstance(value, int): + raise TypeError(f"Expected int got {type(value)}") + if not np.iinfo(np.int32).min <= value <= np.iinfo(np.int32).max: + raise OverflowError(f"data:{value} out of range of int32") + self.value = value + + def get_int_value(self): + if ( + self.__data_type != TSDataType.INT32 + and self.__data_type != TSDataType.DATE + or self.value is None + or self.value is pd.NA + ): + return None + return np.int32(self.value) + + def set_long_value(self, value: int): + if isinstance(value, int): + raise TypeError(f"Expected int got {type(value)}") + + if not np.iinfo(np.int64).min <= value <= np.iinfo(np.int64).max: + raise OverflowError(f"data:{value} out of range of int64") + self.value = value + + def get_long_value(self): + if self.__data_type is None: + raise Exception("Null Field Exception!") + if ( + self.__data_type != TSDataType.INT64 + and self.__data_type != TSDataType.TIMESTAMP + or self.value is None + or self.value is pd.NA + ): + return None + return np.int64(self.value) + + def set_float_value(self, value: float): + if isinstance(value, float): + raise TypeError(f"Expected float got {type(value)}") + if not np.finfo(np.float32).min <= value <= np.finfo(np.float32).max: + raise OverflowError(f"data:{value} out of range of float32") + self.value = value + + def get_float_value(self): + if self.__data_type is None: + raise Exception("Null Field Exception!") + if ( + self.__data_type != TSDataType.FLOAT + or self.value is None + or self.value is pd.NA + ): + return None + return np.float32(self.value) + + def set_double_value(self, value: float): + if isinstance(value, float): + raise TypeError(f"Expected float got {type(value)}") + if not np.finfo(np.float64).min <= value <= np.finfo(np.float64).max: + raise OverflowError(f"data:{value} out of range of float64") + self.value = value + + def get_double_value(self): + if self.__data_type is None: + raise Exception("Null Field Exception!") + if ( + self.__data_type != TSDataType.DOUBLE + or self.value is None + or self.value is pd.NA + ): + return None + return np.float64(self.value) + + def set_binary_value(self, value: bytes): + self.value = value + + def get_binary_value(self): + if self.__data_type is None: + raise Exception("Null Field Exception!") + if ( + self.__data_type != TSDataType.TEXT + and self.__data_type != TSDataType.STRING + and self.__data_type != TSDataType.BLOB + or self.value is None + or self.value is pd.NA + ): + return None + return self.value + + def get_date_value(self): + if self.__data_type is None: + raise Exception("Null Field Exception!") + if ( + self.__data_type != TSDataType.DATE + or self.value is None + or self.value is pd.NA + ): + return None + return parse_int_to_date(self.value) + + def get_string_value(self): + if self.__data_type is None or self.value is None or self.value is pd.NA: + return "None" + # TEXT, STRING + elif self.__data_type == TSDataType.TEXT or self.__data_type == TSDataType.STRING: + return self.value.decode("utf-8") + # BLOB + elif self.__data_type == TSDataType.BLOB: + return str(hex(int.from_bytes(self.value, byteorder="big"))) + else: + return str(self.get_object_value(self.__data_type)) + + def __str__(self): + return self.get_string_value() + + def get_object_value(self, data_type: TSDataType): + """ + :param data_type: TSDataType + """ + if self.__data_type is None or self.value is None or self.value is pd.NA: + return None + if data_type == TSDataType.BOOLEAN: + return bool(self.value) + elif data_type == TSDataType.INT32: + return np.int32(self.value) + elif data_type == TSDataType.INT64 or data_type == TSDataType.TIMESTAMP: + return np.int64(self.value) + elif data_type == TSDataType.FLOAT: + return np.float32(self.value) + elif data_type == TSDataType.DOUBLE: + return np.float64(self.value) + elif data_type == TSDataType.DATE: + return parse_int_to_date(self.value) + elif data_type == TSDataType.TEXT or data_type == TSDataType.BLOB or data_type == TSDataType.STRING: + return self.value + else: + raise RuntimeError("Unsupported data type:" + str(data_type)) + + @staticmethod + def get_field(field_name, value, data_type): + """ + :param field_name: field's name + :param value: field value corresponding to the data type + :param data_type: TSDataType + """ + if value is None or value is pd.NA: + return None Review Comment: Also check data_type? ########## python/tsfile/tablet.py: ########## @@ -0,0 +1,153 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +import math +import struct +from enum import unique, IntEnum +from typing import List, Union + +import numpy as np + +from .date_utils import parse_date_to_int +from .constants import TSDataType, Category + + +class Tablet(object): + + def __init__(self, device_id: str, column_name_list: list[str], type_list: list[TSDataType], + max_row_num: int = 1024): + self.timestamp_list = [None for _ in range(max_row_num)] + self.data_list: List[List[Union[int, float, bool, str, bytes, None]]] = [ + [None for _ in range(max_row_num)] for _ in range(len(column_name_list)) + ] + self.device_id = device_id + self.column_name_list = column_name_list + self.type_list = type_list + self.max_row_num = max_row_num + + self._type_ranges = { + TSDataType.INT32: (-2 ** 31, 2 ** 31 - 1), + TSDataType.INT64: (-2 ** 63, 2 ** 63 - 1), + TSDataType.FLOAT: (np.finfo(np.float32).min, np.finfo(np.float32).max), + TSDataType.DOUBLE: (np.finfo(np.float64).min, np.finfo(np.float64).max), + } + + def _check_index(self, col_index : int, row_index : int): + if not (0 <= col_index < len(self.column_name_list)): + raise IndexError(f"column index {col_index} out of range [0, {len(self.column_name_list) - 1}]") + + if not (0 <= row_index < self.max_row_num): + raise IndexError(f"Row index {row_index} out of range [0, {self.max_row_num - 1}]") + + def get_column_name_list(self): + return self.column_name_list + + def get_data_type_list(self): + return self.type_list + + def get_timestamp_list(self): + return self.timestamp_list + + def get_device_id(self): + return self.device_id + + def get_value_list(self): + return self.data_list + + def get_max_row_num(self): + return self.max_row_num + + def add_column(self, column_name: str, column_type: TSDataType): + self.column_name_list.append(column_name) + self.type_list.append(column_type) + + def remove_column(self, column_name: str): + ind = self.column_name_list.index(column_name) + self.column_name_list.remove(column_name) + self.type_list.remove(self.type_list[ind]) + + def set_timestamp_list(self, timestamp_list: list[int]): + self.timestamp_list = timestamp_list + + def add_timestamp(self, row_index: int, timestamp: int): + self.timestamp_list[row_index] = timestamp + + def set_timestamp(self, row_index: int, timestamp: int): + self.timestamp_list[row_index] = timestamp Review Comment: Just keep one of them, to avoid confusion. ########## python/tsfile/tablet.py: ########## @@ -0,0 +1,153 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +import math +import struct +from enum import unique, IntEnum +from typing import List, Union + +import numpy as np + +from .date_utils import parse_date_to_int +from .constants import TSDataType, Category + + +class Tablet(object): + + def __init__(self, device_id: str, column_name_list: list[str], type_list: list[TSDataType], + max_row_num: int = 1024): + self.timestamp_list = [None for _ in range(max_row_num)] + self.data_list: List[List[Union[int, float, bool, str, bytes, None]]] = [ + [None for _ in range(max_row_num)] for _ in range(len(column_name_list)) + ] + self.device_id = device_id + self.column_name_list = column_name_list + self.type_list = type_list + self.max_row_num = max_row_num + + self._type_ranges = { + TSDataType.INT32: (-2 ** 31, 2 ** 31 - 1), + TSDataType.INT64: (-2 ** 63, 2 ** 63 - 1), + TSDataType.FLOAT: (np.finfo(np.float32).min, np.finfo(np.float32).max), + TSDataType.DOUBLE: (np.finfo(np.float64).min, np.finfo(np.float64).max), + } + + def _check_index(self, col_index : int, row_index : int): + if not (0 <= col_index < len(self.column_name_list)): + raise IndexError(f"column index {col_index} out of range [0, {len(self.column_name_list) - 1}]") + + if not (0 <= row_index < self.max_row_num): + raise IndexError(f"Row index {row_index} out of range [0, {self.max_row_num - 1}]") + + def get_column_name_list(self): + return self.column_name_list + + def get_data_type_list(self): + return self.type_list + + def get_timestamp_list(self): + return self.timestamp_list + + def get_device_id(self): + return self.device_id + + def get_value_list(self): + return self.data_list + + def get_max_row_num(self): + return self.max_row_num + + def add_column(self, column_name: str, column_type: TSDataType): + self.column_name_list.append(column_name) + self.type_list.append(column_type) + + def remove_column(self, column_name: str): + ind = self.column_name_list.index(column_name) + self.column_name_list.remove(column_name) + self.type_list.remove(self.type_list[ind]) + + def set_timestamp_list(self, timestamp_list: list[int]): + self.timestamp_list = timestamp_list + + def add_timestamp(self, row_index: int, timestamp: int): + self.timestamp_list[row_index] = timestamp + + def set_timestamp(self, row_index: int, timestamp: int): + self.timestamp_list[row_index] = timestamp + + def _check_numeric_range(self, value: Union[int, float], data_type: TSDataType): + min_val, max_val = self._type_ranges[data_type] + if math.isinf(value): + raise ValueError( + f"{data_type}.name not support inf" + ) + if not (min_val <= value <= max_val): + raise OverflowError(f"data:{value} out of range ({min_val}, {max_val})") + + def add_value_by_name(self, column_name: str, row_index: int, value: Union[int, float, bool, str, bytes]): + try: + col_index = self.column_name_list.index(column_name) + except ValueError: + raise ValueError(f"Column '{column_name}' does not exist") from None + + if not (0 <= row_index < self.max_row_num): + raise IndexError( + f"Row index {row_index} out of range [0, {self.max_row_num - 1}]" + ) + + expected_type = self.type_list[col_index] + + if not isinstance(value, expected_type.to_py_type()): + raise TypeError(f"Expected {expected_type.to_py_type()} got {type(value)}") + + self._check_numeric_range(value, expected_type) + + self.data_list[col_index][row_index] = value + + def add_value_by_index(self, col_index: int, row_index: int, value: Union[int, float, bool, str, bytes]): + self._check_index(col_index, row_index) + expected_type = self.type_list[col_index] + if not isinstance(value, expected_type.to_py_type()): + raise TypeError(f"Expected {expected_type.to_py_type()} got {type(value)}") + + self._check_numeric_range(value, expected_type) + + self.data_list[col_index][row_index] = value + + def get_value_by_index(self, col_index: int, row_index: int): + self._check_index(col_index, row_index) + return self.data_list[col_index][row_index] Review Comment: Why is index not checked in this method? ########## python/tsfile/tsfile_reader.pyx: ########## @@ -0,0 +1,148 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +#cython: language_level=3 + +from .tsfile_cpp cimport * +from .tsfile_py_cpp cimport * + +from tsfile.schema import TSDataType as TSDataTypePy +from cpython.bytes cimport PyBytes_AsString +from cpython.unicode cimport PyUnicode_AsUTF8String +from typing import List + + +cdef class ResultSetPy: + """ + Get data from a query result. + """ + cdef ResultSet result + cdef ResultSetMetaData metadata_c + cdef public object metadata + cdef public object device_name + + cdef init_c(self, ResultSet result, object device_name): + """ + Init c symbols. + """ + self.result = result + self.metadata_c = tsfile_result_set_get_metadata(self.result) + self.metadata = from_c_result_set_meta_data(self.metadata_c) + self.metadata.set_device_name(device_name) + + def has_next(self): + """ + Check if the query has next rows. + """ + return tsfile_result_set_has_next(self.result) Review Comment: Where is `next()` and `close()`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
