This is an automated email from the ASF dual-hosted git repository.
colinlee pushed a commit to branch colin_refine_write
in repository https://gitbox.apache.org/repos/asf/tsfile.git
The following commit(s) were added to refs/heads/colin_refine_write by this
push:
new 81a83ab7 add batch read.
81a83ab7 is described below
commit 81a83ab7f85a0e7e6c921437d861097cfb107abe
Author: ColinLee <[email protected]>
AuthorDate: Fri Apr 25 15:18:22 2025 +0800
add batch read.
---
cpp/src/common/container/bit_map.h | 5 ++
cpp/src/common/tablet.cc | 10 +--
cpp/src/common/tablet.h | 23 +++---
cpp/src/cwrapper/tsfile_cwrapper.cc | 15 +++-
cpp/src/cwrapper/tsfile_cwrapper.h | 3 +-
cpp/test/common/tablet_test.cc | 13 +++-
python/setup.py | 2 +-
python/tsfile/tsfile_cpp.pxd | 6 +-
python/tsfile/tsfile_writer.pyx | 136 ++++++++++++++++++++++++++++++++++++
9 files changed, 185 insertions(+), 28 deletions(-)
diff --git a/cpp/src/common/container/bit_map.h
b/cpp/src/common/container/bit_map.h
index c2f8f4e5..5a2b2f26 100644
--- a/cpp/src/common/container/bit_map.h
+++ b/cpp/src/common/container/bit_map.h
@@ -59,6 +59,11 @@ class BitMap {
memset(bitmap_, 0x00, size_);
}
+ FORCE_INLINE void set_bitmap(char* bitmap) {
+ memcpy(bitmap_, bitmap, size_);
+ }
+
+
FORCE_INLINE bool test(uint32_t index) {
uint32_t offset = index >> 3;
ASSERT(offset < size_);
diff --git a/cpp/src/common/tablet.cc b/cpp/src/common/tablet.cc
index a5a82729..73c761c1 100644
--- a/cpp/src/common/tablet.cc
+++ b/cpp/src/common/tablet.cc
@@ -284,16 +284,18 @@ int Tablet::add_value(uint32_t row_index, const
std::string &measurement_name,
}
template <>
-int Tablet::set_batch_data(uint32_t col_index, char **data) {
+int Tablet::set_batch_data(uint32_t col_index, char **data, char *mask) {
if (col_index > schema_vec_->size()) {
return common::E_INVALID_SCHEMA;
}
for (int i = 0; i < max_row_num_; i++) {
- value_matrix_[col_index].string_data->dup_from(data[i],
- page_arena_);
+ if (data[i] != nullptr) {
+ value_matrix_[col_index].string_data[i].dup_from(data[i],
+ page_arena_);
+ bitmaps_[col_index].clear(i);
+ }
}
- bitmaps_[col_index].set_zero();
return common::E_OK;
}
diff --git a/cpp/src/common/tablet.h b/cpp/src/common/tablet.h
index aea3ddd5..20eb95c0 100644
--- a/cpp/src/common/tablet.h
+++ b/cpp/src/common/tablet.h
@@ -172,6 +172,7 @@ class Tablet {
size_t get_column_count() const { return schema_vec_->size(); }
uint32_t get_cur_row_size() const { return cur_row_size_; }
+ uint32_t get_max_row_size() const { return max_row_num_; }
/**
* @brief Adds a timestamp to the specified row.
*
@@ -236,7 +237,7 @@ class Tablet {
}
template <typename T>
- int set_batch_data(uint32_t col_index, T *data) {
+ int set_batch_data(uint32_t col_index, T *data, char *mask) {
if (col_index > schema_vec_->size()) {
return common::E_INVALID_ARG;
}
@@ -264,23 +265,19 @@ class Tablet {
break;
default:;
}
- bitmaps_[col_index].set_zero();
- return common::E_OK;
- }
- int set_batch_data(uint32_t col_index, char **data) {
- if (col_index > schema_vec_->size()) {
- return common::E_INVALID_SCHEMA;
+ int size = (max_row_num_ + 7) / 8;
+ for (int i = 0; i < size; i++) {
+ mask[i] = ~mask[i];
}
-
- for (int i = 0; i < max_row_num_; i++) {
- value_matrix_[col_index].string_data[i].dup_from(data[i],
- page_arena_);
- }
- bitmaps_[col_index].set_zero();
+ bitmaps_[col_index].set_bitmap(mask);
return common::E_OK;
}
+ void set_batch_timestamp(int64_t* timestamp) {
+ memcpy(timestamps_, timestamp, max_row_num_);
+ }
+
int set_null_value(uint32_t col_index, uint32_t row_index);
friend class TabletColIterator;
diff --git a/cpp/src/cwrapper/tsfile_cwrapper.cc
b/cpp/src/cwrapper/tsfile_cwrapper.cc
index ce61a861..4df05f6a 100644
--- a/cpp/src/cwrapper/tsfile_cwrapper.cc
+++ b/cpp/src/cwrapper/tsfile_cwrapper.cc
@@ -668,10 +668,10 @@ ERRNO _tsfile_writer_write_ts_record(TsFileWriter writer,
TsRecord data) {
}
ERRNO _tablet_set_batch_data(Tablet tablet, uint32_t col_index,
- const void *data) {
+ const void *data, char* mask) {
auto tab = static_cast<storage::Tablet *>(tablet);
int ret = 0;
- ret = tab->set_batch_data(col_index, data);
+ ret = tab->set_batch_data(col_index, data, mask);
return ret;
}
@@ -679,8 +679,17 @@ ERRNO _tablet_set_batch_str(Tablet tablet, uint32_t
col_index,
const char **data) {
auto tab = static_cast<storage::Tablet *>(tablet);
int ret = 0;
- ret = tab->set_batch_data(col_index, data);
+ ret = tab->set_batch_data(col_index, data, nullptr);
return ret;
+};
+
+ERRNO _tablet_set_batch_timestamp(Tablet tablet, int64_t* timestamp, uint32_t
max_row_num) {
+ auto tab = static_cast<storage::Tablet *>(tablet);
+ if (max_row_num != tab->get_max_row_size()) {
+ return common::E_INVALID_ARG;
+ }
+ tab->set_batch_timestamp(timestamp);
+ return common::E_OK;
}
ERRNO _tablet_mark_null_value(Tablet tablet, uint32_t row_index,
diff --git a/cpp/src/cwrapper/tsfile_cwrapper.h
b/cpp/src/cwrapper/tsfile_cwrapper.h
index bc7c55b5..ffa13562 100644
--- a/cpp/src/cwrapper/tsfile_cwrapper.h
+++ b/cpp/src/cwrapper/tsfile_cwrapper.h
@@ -549,8 +549,9 @@ INSERT_DATA_INTO_TS_RECORD_BY_NAME(bool);
INSERT_DATA_INTO_TS_RECORD_BY_NAME(float);
INSERT_DATA_INTO_TS_RECORD_BY_NAME(double);
-ERRNO _tablet_set_batch_data(Tablet tablet, uint32_t col_index, const void*
data);
+ERRNO _tablet_set_batch_data(Tablet tablet, uint32_t col_index, const void*
data, char* mask);
ERRNO _tablet_set_batch_str(Tablet tablet, uint32_t col_index, const char**
data);
+ERRNO _tablet_set_batch_timestamp(Tablet tablet, int64_t* timestamp, uint32_t
max_row_num);
ERRNO _tablet_mark_null_value(Tablet tablet, uint32_t row_index, uint32_t
col_index);
// Write a tablet into a device.
diff --git a/cpp/test/common/tablet_test.cc b/cpp/test/common/tablet_test.cc
index fe8a5495..d2ce87d7 100644
--- a/cpp/test/common/tablet_test.cc
+++ b/cpp/test/common/tablet_test.cc
@@ -75,19 +75,26 @@ TEST(TabletTest, TabletBatchReadWrite) {
bool_vec[10] = true;
common::TSDataType datatype;
- tablet.set_batch_data(0, bool_vec);
+ char* mask = new char[(100 + 7)/8];
+ for (int i = 0; i < (100 + 7)/8; i++) {
+ mask[i] = 0xff;
+ }
+ tablet.set_batch_data(0, bool_vec, mask);
ASSERT_TRUE(*(bool*)(tablet.get_value(10, 0, datatype)));
ASSERT_EQ(common::TSDataType::BOOLEAN, datatype);
int32_t i32_vec[100] = {false};
i32_vec[99] = 123;
- tablet.set_batch_data(1, i32_vec);
+ for (int i = 0; i < (100 + 7)/8; i++) {
+ mask[i] = 0xff;
+ }
+ tablet.set_batch_data(1, i32_vec, mask);
ASSERT_EQ(0, *(int32_t *)(tablet.get_value(10, 1, datatype)));
ASSERT_EQ(123, *(int32_t *)(tablet.get_value(99, 1, datatype)));
char** str = (char**) malloc(100 * sizeof(char*));
for (int i = 0; i < 100; i++) {
str[i] = strdup(std::string("val" + std::to_string(i)).c_str());
}
- tablet.set_batch_data(5, str);
+ tablet.set_batch_data(5, str, nullptr);
ASSERT_EQ(common::String("val10"), *(common::String*)tablet.get_value(10,
5, datatype));
tablet.set_null_value(5, 20);
diff --git a/python/setup.py b/python/setup.py
index 6edeea0b..84727278 100644
--- a/python/setup.py
+++ b/python/setup.py
@@ -24,7 +24,7 @@ import platform
import shutil
import os
-version = "2.1.0.dev0"
+version = "2.1.0.dev"
system = platform.system()
def copy_tsfile_lib(source_dir, target_dir, suffix):
diff --git a/python/tsfile/tsfile_cpp.pxd b/python/tsfile/tsfile_cpp.pxd
index 1fea5da1..b65585cc 100644
--- a/python/tsfile/tsfile_cpp.pxd
+++ b/python/tsfile/tsfile_cpp.pxd
@@ -130,7 +130,7 @@ cdef extern from "./tsfile_cwrapper.h":
TSDataType * data_types,
int column_num, int max_rows);
- Tablet tablet_new(const char** column_names, TSDataType * data_types, int
column_num);
+ Tablet tablet_new(const char** column_names, TSDataType * data_types, int
column_num, uint32_t max_rows);
ErrorCode tablet_add_timestamp(Tablet tablet, uint32_t row_index, int64_t
timestamp);
ErrorCode tablet_add_value_by_index_int64_t(Tablet tablet, uint32_t
row_index, uint32_t column_index,
@@ -158,9 +158,9 @@ cdef extern from "./tsfile_cwrapper.h":
void _free_tsfile_ts_record(TsRecord * record);
- ErrorCode _tablet_set_batch_data(Tablet tablet, uint32_t col_index, const
void * data);
+ ErrorCode _tablet_set_batch_data(Tablet tablet, uint32_t col_index, const
void* data, char* mask);
ErrorCode _tablet_set_batch_str(Tablet tablet, uint32_t col_index, const
char** data);
- ErrorCode _tablet_mark_null_value(Tablet tablet, uint32_t row_index,
uint32_t col_index);
+ ErrorCode _tablet_set_batch_timestamp(Tablet tablet, int64_t* timestamp,
uint32_t max_row_num);
# resulSet : query data from tsfile reader
ResultSet tsfile_query_table(TsFileReader reader,
diff --git a/python/tsfile/tsfile_writer.pyx b/python/tsfile/tsfile_writer.pyx
index a670e5cb..64cab0a0 100644
--- a/python/tsfile/tsfile_writer.pyx
+++ b/python/tsfile/tsfile_writer.pyx
@@ -17,6 +17,11 @@
#
#cython: language_level=3
+import pandas as pd
+import numpy as np
+
+from libc.stdlib cimport free
+from libc.stdlib cimport malloc
from .tsfile_cpp cimport *
from .tsfile_py_cpp cimport *
@@ -25,6 +30,137 @@ from tsfile.row_record import RowRecord
from tsfile.schema import TimeseriesSchema as TimeseriesSchemaPy, DeviceSchema
as DeviceSchemaPy
from tsfile.schema import TableSchema as TableSchemaPy
from tsfile.tablet import Tablet as TabletPy
+from tsfile.constants import TSDataType as TSDataTypePy
+
+_pandas_dtype_to_ts = {
+ "bool": TSDataTypePy.BOOLEAN,
+ "int32": TSDataTypePy.INT32,
+ "int64": TSDataTypePy.INT64,
+ "float32": TSDataTypePy.FLOAT,
+ "float64": TSDataTypePy.DOUBLE,
+ "string": TSDataTypePy.STRING
+}
+
+cdef bint is_compatible(TSDataTypePy expected, TSDataTypePy actual):
+ if expected == actual:
+ return True
+ if expected == TSDataTypePy.INT64 and actual == TSDataTypePy.INT32:
+ return True
+ if expected == TSDataTypePy.DOUBLE and actual == TSDataTypePy.FLOAT:
+ return True
+ return False
+
+def convert_series(pd.Series series, TSDataTypePy target) -> np.ndarray:
+ dtype_map = {
+ TSDataTypePy.INT64: "int64",
+ TSDataTypePy.INT32: "int32",
+ TSDataTypePy.FLOAT: "float32",
+ TSDataTypePy.DOUBLE: "float64",
+ TSDataTypePy.BOOLEAN: "bool",
+ TSDataTypePy.STRING: "str",
+ }
+
+ target_str = dtype_map.get(target)
+ if str(series.dtype) == target_str:
+ return series.to_numpy()
+ return series.astype(target_str).to_numpy()
+def encode_or_null(x):
+ if pd.isna(x):
+ return None
+ return str(x).encode('utf-8')
+
+cdef class CTablet:
+ cdef Tablet tablet
+ cdef object column_name
+ cdef object data_type
+ cdef object max_row_num
+ cdef char** column_names
+ cdef int column_num
+ cdef TSDataType * column_data_types
+
+ def __init__(self, column_name: list[str], data_types: list[TSDataTypePy],
max_row_num: int = 1024):
+
+ self.column_name = column_name
+ self.data_type = data_types
+ self.max_row_num = max_row_num
+ column_num = len(column_name)
+ if len(data_types) != column_num:
+ raise ValueError("Length of column_name and data_types must be
equal")
+ column_names = <char**> malloc(sizeof(char*) * column_num)
+ column_data_types = <TSDataType*> malloc(sizeof(TSDataType) *
column_num)
+
+ ind = 0
+ for name, dtype in zip(column_name, data_types):
+ column_names[ind] = strdup(name.encode('utf-8'))
+ column_data_types[ind] = to_c_data_type(dtype)
+
+
+
+ cdef init_c_tablet(self):
+ if self.tablet != NULL:
+ free_tablet(self.tablet)
+ tablet_new(self.column_names, self.column_data_types, self.column_num,
self.max_row_num)
+
+ cpdef from_data_frame(self, data_frame: pd.DataFrame):
+ cdef void * data_ptr
+ cdef const uint32_t * mask_ptr
+ cdef size_t length
+ cdef char** str_ptr
+ cdef bytes item
+ if not isinstance(data_frame, pd.DataFrame):
+ raise TypeError("Input must be a pandas DataFrame")
+ if data_frame.shape[1] != len(self.column_name) + 1:
+ raise ValueError(f"DataFrame column count {data_frame.shape[1]}
doesn't match expected {len(self.column_name) + 1}")
+
+ if "time" not in data_frame.columns:
+ raise ValueError("Missing required column: 'time'")
+ if not pd.api.types.is_integer_dtype(data_frame["time"]):
+ raise TypeError("Column 'time' must be of integer type")
+ if data_frame["time"].dtype != np.int64:
+ raise TypeError(f"Column 'time' must be int64, but got
{data_frame['time'].dtype}")
+
+ self.init_c_tablet()
+
+ data_ptr = <void*> data_frame["time"].to_numpy().data
+ _tablet_set_batch_timestamp(self.tablet, data_ptr)
+
+ for i, col_name in enumerate(self.column_name):
+ if col_name not in data_frame.columns:
+ raise KeyError(f"Column '{col_name}' missing from DataFrame")
+ series = data_frame[col_name]
+ dtype_str = str(series.dtype)
+ if dtype_str not in _pandas_dtype_to_ts:
+ raise TypeError(f"Unsupported pandas dtype {dtype_str} for
column {col_name}")
+ actual_ts_type = _pandas_dtype_to_ts[dtype_str]
+ expected_ts_type = self.data_type[i]
+ if not is_compatible(expected_ts_type, actual_ts_type):
+ raise TypeError(
+ f"Column '{col_name}' type mismatch: expected
{expected_ts_type.name}, got {actual_ts_type.name}")
+
+ if expected_ts_type == TSDataTypePy.STRING:
+ str_ptr = <char**> malloc(sizeof(char*) * self.max_row_num)
+ array =
series.fillna("").astype(str).apply(encode_or_null).to_numpy(dtype=object)
+ for i in range(self.max_row_num):
+ if array[i] is None:
+ str_ptr[i] = NULL
+ else:
+ str_ptr[i] = <const char*>array[i]
+ _tablet_set_batch_str(self.tablet, i, str_ptr)
+ for i in range(self.max_row_num):
+ if str_ptr[i] != NULL:
+ free(str_ptr[i])
+ free(str_ptr)
+ else:
+ array = convert_series(series, expected_ts_type)
+ mask = series.notna.to_numpy().astype(np.byte)
+ data_ptr = <void*>array.data
+ mask_ptr = <char*> mask.data
+
+ _tablet_set_batch_data(self.tablet, i, data_ptr, mask_ptr)
+
+
+
+
cdef class TsFileWriterPy:
cdef TsFileWriter writer