This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/tsfile.git


The following commit(s) were added to refs/heads/develop by this push:
     new 3684dbfe add memory threshold in py writer. (#466)
3684dbfe is described below

commit 3684dbfe7659772ea2c958b81bf944fd8322b02a
Author: Colin Lee <[email protected]>
AuthorDate: Tue Apr 22 10:48:56 2025 +0800

    add memory threshold in py writer. (#466)
---
 cpp/src/cwrapper/tsfile_cwrapper.cc | 4 +++-
 cpp/src/cwrapper/tsfile_cwrapper.h  | 5 +++--
 python/tsfile/tsfile_cpp.pxd        | 5 +++--
 python/tsfile/tsfile_py_cpp.pxd     | 2 +-
 python/tsfile/tsfile_py_cpp.pyx     | 4 ++--
 python/tsfile/tsfile_writer.pyx     | 4 ++--
 6 files changed, 14 insertions(+), 10 deletions(-)

diff --git a/cpp/src/cwrapper/tsfile_cwrapper.cc 
b/cpp/src/cwrapper/tsfile_cwrapper.cc
index 371e8ced..9e5e31a8 100644
--- a/cpp/src/cwrapper/tsfile_cwrapper.cc
+++ b/cpp/src/cwrapper/tsfile_cwrapper.cc
@@ -561,7 +561,8 @@ void free_write_file(WriteFile *write_file) {
 }
 
 // For Python API
-TsFileWriter _tsfile_writer_new(const char *pathname, ERRNO *err_code) {
+TsFileWriter _tsfile_writer_new(const char *pathname, uint64_t 
memory_threshold,
+                                ERRNO *err_code) {
     init_tsfile_config();
     auto writer = new storage::TsFileWriter();
     int flags = O_WRONLY | O_CREAT | O_TRUNC;
@@ -569,6 +570,7 @@ TsFileWriter _tsfile_writer_new(const char *pathname, ERRNO 
*err_code) {
     flags |= O_BINARY;
 #endif
     int ret = writer->open(pathname, flags, 0644);
+    common::g_config_value_.chunk_group_size_threshold_ = memory_threshold;
     if (ret != common::E_OK) {
         delete writer;
         *err_code = ret;
diff --git a/cpp/src/cwrapper/tsfile_cwrapper.h 
b/cpp/src/cwrapper/tsfile_cwrapper.h
index d43e5dce..cfbfe8ef 100644
--- a/cpp/src/cwrapper/tsfile_cwrapper.h
+++ b/cpp/src/cwrapper/tsfile_cwrapper.h
@@ -156,7 +156,7 @@ TsFileWriter tsfile_writer_new(WriteFile file, TableSchema* 
schema,
  * @param schema       Table schema definition.
  *                     - Ownership: Should be free it by Caller.
  * @param memory_threshold  When the size of written data exceeds
- * this value, the data will be automatically flushed to the disk. 
+ * this value, the data will be automatically flushed to the disk.
  * @param err_code     [out] E_OK(0), or check error code in errno_define_c.h.
  *
  * @return TsFileWriter Valid handle on success, NULL on failure.
@@ -514,7 +514,8 @@ void free_write_file(WriteFile* write_file);
  *  Avoid use: No compatibility/existence guarantees. */
 
 // Create a tsfile writer.
-TsFileWriter _tsfile_writer_new(const char* pathname, ERRNO* err_code);
+TsFileWriter _tsfile_writer_new(const char* pathname, uint64_t 
memory_threshold,
+                                ERRNO* err_code);
 
 // Create a tablet with name, data_type and max_rows.
 Tablet _tablet_new_with_target_name(const char* device_id,
diff --git a/python/tsfile/tsfile_cpp.pxd b/python/tsfile/tsfile_cpp.pxd
index b4f9ccf8..3749aa7a 100644
--- a/python/tsfile/tsfile_cpp.pxd
+++ b/python/tsfile/tsfile_cpp.pxd
@@ -17,7 +17,7 @@
 #
 
 #cython: language_level=3
-from libc.stdint cimport uint32_t, int32_t, int64_t
+from libc.stdint cimport uint32_t, int32_t, int64_t, uint64_t
 
 ctypedef int32_t ErrorCode
 
@@ -109,7 +109,8 @@ cdef extern from "./tsfile_cwrapper.h":
     ErrorCode tsfile_reader_close(TsFileReader reader)
 
     # writer: new and close
-    TsFileWriter _tsfile_writer_new(const char * pathname, ErrorCode * 
err_code);
+    TsFileWriter _tsfile_writer_new(const char * pathname, uint64_t 
memory_threshold,
+                                    ErrorCode * err_code);
     ErrorCode _tsfile_writer_close(TsFileWriter writer);
 
     # writer : register table, device and timeseries
diff --git a/python/tsfile/tsfile_py_cpp.pxd b/python/tsfile/tsfile_py_cpp.pxd
index be1d5290..c7310d7c 100644
--- a/python/tsfile/tsfile_py_cpp.pxd
+++ b/python/tsfile/tsfile_py_cpp.pxd
@@ -40,7 +40,7 @@ cdef public api void 
free_c_timeseries_schema(TimeseriesSchema* c_schema)
 cdef public api void free_c_device_schema(DeviceSchema* c_schema)
 cdef public api void free_c_tablet(Tablet tablet)
 cdef public api void free_c_row_record(TsRecord record)
-cdef public api TsFileWriter tsfile_writer_new_c(object pathname) except +
+cdef public api TsFileWriter tsfile_writer_new_c(object pathname, uint64_t 
memory_threshold) except +
 cdef public api TsFileReader tsfile_reader_new_c(object pathname) except +
 cdef public api ErrorCode tsfile_writer_register_device_py_cpp(TsFileWriter 
writer, DeviceSchema *schema)
 cdef public api ErrorCode 
tsfile_writer_register_timeseries_py_cpp(TsFileWriter writer, object 
device_name,
diff --git a/python/tsfile/tsfile_py_cpp.pyx b/python/tsfile/tsfile_py_cpp.pyx
index 649c0755..90b12b4a 100644
--- a/python/tsfile/tsfile_py_cpp.pyx
+++ b/python/tsfile/tsfile_py_cpp.pyx
@@ -329,12 +329,12 @@ cdef void free_c_row_record(TsRecord record):
     _free_tsfile_ts_record(&record)
 
 # Reader and writer new.
-cdef TsFileWriter tsfile_writer_new_c(object pathname) except +:
+cdef TsFileWriter tsfile_writer_new_c(object pathname, uint64_t 
memory_threshold) except +:
     cdef ErrorCode errno = 0
     cdef TsFileWriter writer
     cdef bytes encoded_path = PyUnicode_AsUTF8String(pathname)
     cdef const char* c_path = encoded_path
-    writer = _tsfile_writer_new(c_path, &errno)
+    writer = _tsfile_writer_new(c_path, memory_threshold, &errno)
     check_error(errno)
     return writer
 
diff --git a/python/tsfile/tsfile_writer.pyx b/python/tsfile/tsfile_writer.pyx
index a670e5cb..4ceecd3c 100644
--- a/python/tsfile/tsfile_writer.pyx
+++ b/python/tsfile/tsfile_writer.pyx
@@ -29,8 +29,8 @@ from tsfile.tablet import Tablet as TabletPy
 cdef class TsFileWriterPy:
     cdef TsFileWriter writer
 
-    def __init__(self, pathname):
-        self.writer = tsfile_writer_new_c(pathname)
+    def __init__(self, pathname, memory_threshold = 128 * 1024 * 1024):
+        self.writer = tsfile_writer_new_c(pathname, memory_threshold)
 
     def register_timeseries(self, device_name : str, timeseries_schema : 
TimeseriesSchemaPy):
         """

Reply via email to