This is an automated email from the ASF dual-hosted git repository.

colinlee pushed a commit to branch colin_refine_write
in repository https://gitbox.apache.org/repos/asf/tsfile.git


The following commit(s) were added to refs/heads/colin_refine_write by this 
push:
     new 81a83ab7 add batch read.
81a83ab7 is described below

commit 81a83ab7f85a0e7e6c921437d861097cfb107abe
Author: ColinLee <[email protected]>
AuthorDate: Fri Apr 25 15:18:22 2025 +0800

    add batch read.
---
 cpp/src/common/container/bit_map.h  |   5 ++
 cpp/src/common/tablet.cc            |  10 +--
 cpp/src/common/tablet.h             |  23 +++---
 cpp/src/cwrapper/tsfile_cwrapper.cc |  15 +++-
 cpp/src/cwrapper/tsfile_cwrapper.h  |   3 +-
 cpp/test/common/tablet_test.cc      |  13 +++-
 python/setup.py                     |   2 +-
 python/tsfile/tsfile_cpp.pxd        |   6 +-
 python/tsfile/tsfile_writer.pyx     | 136 ++++++++++++++++++++++++++++++++++++
 9 files changed, 185 insertions(+), 28 deletions(-)

diff --git a/cpp/src/common/container/bit_map.h 
b/cpp/src/common/container/bit_map.h
index c2f8f4e5..5a2b2f26 100644
--- a/cpp/src/common/container/bit_map.h
+++ b/cpp/src/common/container/bit_map.h
@@ -59,6 +59,11 @@ class BitMap {
         memset(bitmap_, 0x00, size_);
     }
 
+    FORCE_INLINE void set_bitmap(char* bitmap) {
+        memcpy(bitmap_, bitmap, size_);
+    }
+
+
     FORCE_INLINE bool test(uint32_t index) {
         uint32_t offset = index >> 3;
         ASSERT(offset < size_);
diff --git a/cpp/src/common/tablet.cc b/cpp/src/common/tablet.cc
index a5a82729..73c761c1 100644
--- a/cpp/src/common/tablet.cc
+++ b/cpp/src/common/tablet.cc
@@ -284,16 +284,18 @@ int Tablet::add_value(uint32_t row_index, const 
std::string &measurement_name,
 }
 
 template <>
-int Tablet::set_batch_data(uint32_t col_index, char **data) {
+int Tablet::set_batch_data(uint32_t col_index, char **data, char *mask) {
     if (col_index > schema_vec_->size()) {
         return common::E_INVALID_SCHEMA;
     }
 
     for (int i = 0; i < max_row_num_; i++) {
-        value_matrix_[col_index].string_data->dup_from(data[i],
-                                                       page_arena_);
+        if (data[i] != nullptr) {
+            value_matrix_[col_index].string_data[i].dup_from(data[i],
+                                               page_arena_);
+            bitmaps_[col_index].clear(i);
+        }
     }
-    bitmaps_[col_index].set_zero();
     return common::E_OK;
 }
 
diff --git a/cpp/src/common/tablet.h b/cpp/src/common/tablet.h
index aea3ddd5..20eb95c0 100644
--- a/cpp/src/common/tablet.h
+++ b/cpp/src/common/tablet.h
@@ -172,6 +172,7 @@ class Tablet {
     size_t get_column_count() const { return schema_vec_->size(); }
     uint32_t get_cur_row_size() const { return cur_row_size_; }
 
+    uint32_t get_max_row_size() const { return max_row_num_; }
     /**
      * @brief Adds a timestamp to the specified row.
      *
@@ -236,7 +237,7 @@ class Tablet {
     }
 
     template <typename T>
-    int set_batch_data(uint32_t col_index, T *data) {
+    int set_batch_data(uint32_t col_index, T *data, char *mask) {
         if (col_index > schema_vec_->size()) {
             return common::E_INVALID_ARG;
         }
@@ -264,23 +265,19 @@ class Tablet {
                 break;
             default:;
         }
-        bitmaps_[col_index].set_zero();
-        return common::E_OK;
-    }
 
-    int set_batch_data(uint32_t col_index, char **data) {
-        if (col_index > schema_vec_->size()) {
-            return common::E_INVALID_SCHEMA;
+        int size = (max_row_num_ + 7) / 8;
+        for (int i = 0; i < size; i++) {
+            mask[i] = ~mask[i];
         }
-
-        for (int i = 0; i < max_row_num_; i++) {
-            value_matrix_[col_index].string_data[i].dup_from(data[i],
-                                                           page_arena_);
-        }
-        bitmaps_[col_index].set_zero();
+        bitmaps_[col_index].set_bitmap(mask);
         return common::E_OK;
     }
 
+    void set_batch_timestamp(int64_t* timestamp) {
+        memcpy(timestamps_, timestamp, max_row_num_);
+    }
+
     int set_null_value(uint32_t col_index, uint32_t row_index);
 
     friend class TabletColIterator;
diff --git a/cpp/src/cwrapper/tsfile_cwrapper.cc 
b/cpp/src/cwrapper/tsfile_cwrapper.cc
index ce61a861..4df05f6a 100644
--- a/cpp/src/cwrapper/tsfile_cwrapper.cc
+++ b/cpp/src/cwrapper/tsfile_cwrapper.cc
@@ -668,10 +668,10 @@ ERRNO _tsfile_writer_write_ts_record(TsFileWriter writer, 
TsRecord data) {
 }
 
 ERRNO _tablet_set_batch_data(Tablet tablet, uint32_t col_index,
-                             const void *data) {
+                             const void *data, char* mask) {
     auto tab = static_cast<storage::Tablet *>(tablet);
     int ret = 0;
-    ret = tab->set_batch_data(col_index, data);
+    ret = tab->set_batch_data(col_index, data, mask);
     return ret;
 }
 
@@ -679,8 +679,17 @@ ERRNO _tablet_set_batch_str(Tablet tablet, uint32_t 
col_index,
                             const char **data) {
     auto tab = static_cast<storage::Tablet *>(tablet);
     int ret = 0;
-    ret = tab->set_batch_data(col_index, data);
+    ret = tab->set_batch_data(col_index, data, nullptr);
     return ret;
+};
+
+ERRNO _tablet_set_batch_timestamp(Tablet tablet, int64_t* timestamp, uint32_t 
max_row_num) {
+    auto tab = static_cast<storage::Tablet *>(tablet);
+    if (max_row_num != tab->get_max_row_size()) {
+        return common::E_INVALID_ARG;
+    }
+    tab->set_batch_timestamp(timestamp);
+    return common::E_OK;
 }
 
 ERRNO _tablet_mark_null_value(Tablet tablet, uint32_t row_index,
diff --git a/cpp/src/cwrapper/tsfile_cwrapper.h 
b/cpp/src/cwrapper/tsfile_cwrapper.h
index bc7c55b5..ffa13562 100644
--- a/cpp/src/cwrapper/tsfile_cwrapper.h
+++ b/cpp/src/cwrapper/tsfile_cwrapper.h
@@ -549,8 +549,9 @@ INSERT_DATA_INTO_TS_RECORD_BY_NAME(bool);
 INSERT_DATA_INTO_TS_RECORD_BY_NAME(float);
 INSERT_DATA_INTO_TS_RECORD_BY_NAME(double);
 
-ERRNO _tablet_set_batch_data(Tablet tablet, uint32_t col_index, const void* 
data);
+ERRNO _tablet_set_batch_data(Tablet tablet, uint32_t col_index, const void* 
data, char* mask);
 ERRNO _tablet_set_batch_str(Tablet tablet, uint32_t col_index, const char** 
data);
+ERRNO _tablet_set_batch_timestamp(Tablet tablet, int64_t* timestamp, uint32_t 
max_row_num);
 ERRNO _tablet_mark_null_value(Tablet tablet, uint32_t row_index, uint32_t 
col_index);
 
 // Write a tablet into a device.
diff --git a/cpp/test/common/tablet_test.cc b/cpp/test/common/tablet_test.cc
index fe8a5495..d2ce87d7 100644
--- a/cpp/test/common/tablet_test.cc
+++ b/cpp/test/common/tablet_test.cc
@@ -75,19 +75,26 @@ TEST(TabletTest, TabletBatchReadWrite) {
     bool_vec[10] = true;
 
     common::TSDataType datatype;
-    tablet.set_batch_data(0, bool_vec);
+    char* mask = new char[(100 + 7)/8];
+    for (int i = 0; i < (100 + 7)/8; i++) {
+        mask[i] = 0xff;
+    }
+    tablet.set_batch_data(0, bool_vec, mask);
     ASSERT_TRUE(*(bool*)(tablet.get_value(10, 0, datatype)));
     ASSERT_EQ(common::TSDataType::BOOLEAN, datatype);
     int32_t i32_vec[100] = {false};
     i32_vec[99] = 123;
-    tablet.set_batch_data(1, i32_vec);
+    for (int i = 0; i < (100 + 7)/8; i++) {
+        mask[i] = 0xff;
+    }
+    tablet.set_batch_data(1, i32_vec, mask);
     ASSERT_EQ(0, *(int32_t *)(tablet.get_value(10, 1, datatype)));
     ASSERT_EQ(123, *(int32_t *)(tablet.get_value(99, 1, datatype)));
     char** str = (char**) malloc(100 * sizeof(char*));
     for (int i = 0; i < 100; i++) {
         str[i] = strdup(std::string("val" + std::to_string(i)).c_str());
     }
-    tablet.set_batch_data(5, str);
+    tablet.set_batch_data(5, str, nullptr);
     ASSERT_EQ(common::String("val10"), *(common::String*)tablet.get_value(10, 
5, datatype));
 
     tablet.set_null_value(5, 20);
diff --git a/python/setup.py b/python/setup.py
index 6edeea0b..84727278 100644
--- a/python/setup.py
+++ b/python/setup.py
@@ -24,7 +24,7 @@ import platform
 import shutil
 import os
 
-version = "2.1.0.dev0"
+version = "2.1.0.dev"
 system = platform.system()
 
 def copy_tsfile_lib(source_dir, target_dir, suffix):
diff --git a/python/tsfile/tsfile_cpp.pxd b/python/tsfile/tsfile_cpp.pxd
index 1fea5da1..b65585cc 100644
--- a/python/tsfile/tsfile_cpp.pxd
+++ b/python/tsfile/tsfile_cpp.pxd
@@ -130,7 +130,7 @@ cdef extern from "./tsfile_cwrapper.h":
                                         TSDataType * data_types,
                                         int column_num, int max_rows);
 
-    Tablet tablet_new(const char** column_names, TSDataType * data_types, int 
column_num);
+    Tablet tablet_new(const char** column_names, TSDataType * data_types, int 
column_num, uint32_t max_rows);
 
     ErrorCode tablet_add_timestamp(Tablet tablet, uint32_t row_index, int64_t 
timestamp);
     ErrorCode tablet_add_value_by_index_int64_t(Tablet tablet, uint32_t 
row_index, uint32_t column_index,
@@ -158,9 +158,9 @@ cdef extern from "./tsfile_cwrapper.h":
 
     void _free_tsfile_ts_record(TsRecord * record);
 
-    ErrorCode _tablet_set_batch_data(Tablet tablet, uint32_t col_index, const 
void * data);
+    ErrorCode _tablet_set_batch_data(Tablet tablet, uint32_t col_index, const 
void* data, char* mask);
     ErrorCode _tablet_set_batch_str(Tablet tablet, uint32_t col_index, const 
char** data);
-    ErrorCode _tablet_mark_null_value(Tablet tablet, uint32_t row_index, 
uint32_t col_index);
+    ErrorCode _tablet_set_batch_timestamp(Tablet tablet, int64_t* timestamp, 
uint32_t max_row_num);
 
     # resulSet : query data from tsfile reader
     ResultSet tsfile_query_table(TsFileReader reader,
diff --git a/python/tsfile/tsfile_writer.pyx b/python/tsfile/tsfile_writer.pyx
index a670e5cb..64cab0a0 100644
--- a/python/tsfile/tsfile_writer.pyx
+++ b/python/tsfile/tsfile_writer.pyx
@@ -17,6 +17,11 @@
 #
 
 #cython: language_level=3
+import pandas as pd
+import numpy as np
+
+from libc.stdlib cimport free
+from libc.stdlib cimport malloc
 
 from .tsfile_cpp cimport *
 from .tsfile_py_cpp cimport *
@@ -25,6 +30,137 @@ from tsfile.row_record import RowRecord
 from tsfile.schema import TimeseriesSchema as TimeseriesSchemaPy, DeviceSchema 
as DeviceSchemaPy
 from tsfile.schema import TableSchema as TableSchemaPy
 from tsfile.tablet import Tablet as TabletPy
+from tsfile.constants import TSDataType as TSDataTypePy
+
+_pandas_dtype_to_ts = {
+    "bool": TSDataTypePy.BOOLEAN,
+    "int32": TSDataTypePy.INT32,
+    "int64": TSDataTypePy.INT64,
+    "float32": TSDataTypePy.FLOAT,
+    "float64": TSDataTypePy.DOUBLE,
+    "string": TSDataTypePy.STRING
+}
+
+cdef bint is_compatible(TSDataTypePy expected, TSDataTypePy actual):
+    if expected == actual:
+        return True
+    if expected == TSDataTypePy.INT64 and actual == TSDataTypePy.INT32:
+        return True
+    if expected == TSDataTypePy.DOUBLE and actual == TSDataTypePy.FLOAT:
+        return True
+    return False
+
+def convert_series(pd.Series series, TSDataTypePy target) -> np.ndarray:
+    dtype_map = {
+        TSDataTypePy.INT64: "int64",
+        TSDataTypePy.INT32: "int32",
+        TSDataTypePy.FLOAT: "float32",
+        TSDataTypePy.DOUBLE: "float64",
+        TSDataTypePy.BOOLEAN: "bool",
+        TSDataTypePy.STRING: "str",
+    }
+
+    target_str = dtype_map.get(target)
+    if str(series.dtype) == target_str:
+        return series.to_numpy()
+    return series.astype(target_str).to_numpy()
+def encode_or_null(x):
+    if pd.isna(x):
+        return None
+    return str(x).encode('utf-8')
+
+cdef class CTablet:
+    cdef Tablet tablet
+    cdef object column_name
+    cdef object data_type
+    cdef object max_row_num
+    cdef char** column_names
+    cdef int column_num
+    cdef TSDataType * column_data_types
+
+    def __init__(self, column_name: list[str], data_types: list[TSDataTypePy], 
max_row_num: int = 1024):
+
+        self.column_name = column_name
+        self.data_type = data_types
+        self.max_row_num = max_row_num
+        column_num = len(column_name)
+        if len(data_types) != column_num:
+            raise ValueError("Length of column_name and data_types must be 
equal")
+        column_names = <char**> malloc(sizeof(char*) * column_num)
+        column_data_types = <TSDataType*> malloc(sizeof(TSDataType) * 
column_num)
+
+        ind = 0
+        for name, dtype in zip(column_name, data_types):
+            column_names[ind] = strdup(name.encode('utf-8'))
+            column_data_types[ind] = to_c_data_type(dtype)
+
+
+
+    cdef init_c_tablet(self):
+        if self.tablet != NULL:
+            free_tablet(self.tablet)
+        tablet_new(self.column_names, self.column_data_types, self.column_num, 
 self.max_row_num)
+
+    cpdef from_data_frame(self, data_frame: pd.DataFrame):
+        cdef void * data_ptr
+        cdef const uint32_t * mask_ptr
+        cdef size_t length
+        cdef char** str_ptr
+        cdef bytes item
+        if not isinstance(data_frame, pd.DataFrame):
+            raise TypeError("Input must be a pandas DataFrame")
+        if data_frame.shape[1] != len(self.column_name) + 1:
+            raise ValueError(f"DataFrame column count {data_frame.shape[1]} 
doesn't match expected {len(self.column_name) + 1}")
+
+        if "time" not in data_frame.columns:
+            raise ValueError("Missing required column: 'time'")
+        if not pd.api.types.is_integer_dtype(data_frame["time"]):
+            raise TypeError("Column 'time' must be of integer type")
+        if data_frame["time"].dtype != np.int64:
+            raise TypeError(f"Column 'time' must be int64, but got 
{data_frame['time'].dtype}")
+
+        self.init_c_tablet()
+
+        data_ptr = <void*> data_frame["time"].to_numpy().data
+        _tablet_set_batch_timestamp(self.tablet, data_ptr)
+
+        for i, col_name in enumerate(self.column_name):
+            if col_name not in data_frame.columns:
+                raise KeyError(f"Column '{col_name}' missing from DataFrame")
+            series = data_frame[col_name]
+            dtype_str = str(series.dtype)
+            if dtype_str not in _pandas_dtype_to_ts:
+                raise TypeError(f"Unsupported pandas dtype {dtype_str} for 
column {col_name}")
+            actual_ts_type = _pandas_dtype_to_ts[dtype_str]
+            expected_ts_type = self.data_type[i]
+            if not is_compatible(expected_ts_type, actual_ts_type):
+                raise TypeError(
+                    f"Column '{col_name}' type mismatch: expected 
{expected_ts_type.name}, got {actual_ts_type.name}")
+
+            if expected_ts_type == TSDataTypePy.STRING:
+                str_ptr = <char**> malloc(sizeof(char*) * self.max_row_num)
+                array = 
series.fillna("").astype(str).apply(encode_or_null).to_numpy(dtype=object)
+                for i in range(self.max_row_num):
+                    if array[i] is None:
+                        str_ptr[i] = NULL
+                    else:
+                        str_ptr[i] = <const char*>array[i]
+                _tablet_set_batch_str(self.tablet, i, str_ptr)
+                for i in range(self.max_row_num):
+                    if str_ptr[i] != NULL:
+                        free(str_ptr[i])
+                free(str_ptr)
+            else:
+                array = convert_series(series, expected_ts_type)
+                mask = series.notna.to_numpy().astype(np.byte)
+                data_ptr = <void*>array.data
+                mask_ptr = <char*> mask.data
+
+                _tablet_set_batch_data(self.tablet, i, data_ptr, mask_ptr)
+
+
+
+
 
 cdef class TsFileWriterPy:
     cdef TsFileWriter writer

Reply via email to