This is an automated email from the ASF dual-hosted git repository. colinlee pushed a commit to branch support_arrow_struct in repository https://gitbox.apache.org/repos/asf/tsfile.git
commit a878ad56eb0c17ffd75c50864f9b51ac63016a07 Author: ColinLee <[email protected]> AuthorDate: Sun Feb 1 23:46:00 2026 +0800 tmp code. support arrow. --- cpp/CMakeLists.txt | 2 +- cpp/src/common/tsblock/vector/vector.h | 4 + cpp/src/cwrapper/CMakeLists.txt | 2 +- cpp/src/cwrapper/arrow_c.cc | 761 +++++++++++++++++++++ cpp/src/cwrapper/tsfile_cwrapper.cc | 51 ++ cpp/src/cwrapper/tsfile_cwrapper.h | 65 +- cpp/src/reader/qds_with_timegenerator.h | 1 + cpp/src/reader/qds_without_timegenerator.h | 1 + cpp/src/reader/result_set.h | 4 + cpp/src/reader/table_query_executor.h | 14 +- cpp/src/reader/table_result_set.cc | 34 + cpp/src/reader/table_result_set.h | 10 +- cpp/src/reader/tsfile_reader.cc | 14 +- cpp/src/reader/tsfile_reader.h | 5 +- cpp/test/common/tsblock/arrow_tsblock_test.cc | 334 +++++++++ .../table_view/tsfile_reader_table_batch_test.cc | 475 +++++++++++++ cpp/third_party/zlib-1.3.1/treebuild.xml | 188 +++-- .../zlib-1.3.1/zlib-1.3.1/treebuild.xml | 188 +++-- python/Untitled | 1 + python/lower_case_name.tsfile | Bin 0 -> 23089 bytes python/requirements.txt | 2 + python/setup.py | 16 +- python/test1.tsfile | Bin 0 -> 23089 bytes python/tests/bench_batch_arrow_vs_dataframe.py | 264 +++++++ python/tests/test_batch_arrow.py | 444 ++++++++++++ python/tsfile/tsfile_cpp.pxd | 35 + python/tsfile/tsfile_py_cpp.pxd | 2 + python/tsfile/tsfile_py_cpp.pyx | 27 + python/tsfile/tsfile_reader.pyx | 47 ++ python/tsfile/utils.py | 1 + 30 files changed, 2780 insertions(+), 212 deletions(-) diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index c85150d8f..fc35de43c 100755 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -111,7 +111,7 @@ endif () option(BUILD_TEST "Build tests" ON) message("cmake using: BUILD_TEST=${BUILD_TEST}") -option(ENABLE_ANTLR4 "Enable ANTLR4 runtime" ON) +option(ENABLE_ANTLR4 "Enable ANTLR4 runtime" OFF) message("cmake using: ENABLE_ANTLR4=${ENABLE_ANTLR4}") option(ENABLE_SNAPPY "Enable Google Snappy compression" ON) diff --git a/cpp/src/common/tsblock/vector/vector.h b/cpp/src/common/tsblock/vector/vector.h index 20a765967..37a96c543 100644 --- a/cpp/src/common/tsblock/vector/vector.h +++ b/cpp/src/common/tsblock/vector/vector.h @@ -78,6 +78,10 @@ class Vector { FORCE_INLINE bool has_null() { return has_null_; } + FORCE_INLINE common::BitMap& get_bitmap() { return nulls_; } + + FORCE_INLINE common::ByteBuffer& get_value_data() { return values_; } + // We want derived class to have access to base class members, so it is // defined as protected protected: diff --git a/cpp/src/cwrapper/CMakeLists.txt b/cpp/src/cwrapper/CMakeLists.txt index 07f52eb33..d62250bf0 100644 --- a/cpp/src/cwrapper/CMakeLists.txt +++ b/cpp/src/cwrapper/CMakeLists.txt @@ -18,7 +18,7 @@ under the License. ]] message("Running in cwrapper directory") set(CMAKE_POSITION_INDEPENDENT_CODE ON) -set(CWRAPPER_SRC_LIST tsfile_cwrapper.cc) +set(CWRAPPER_SRC_LIST tsfile_cwrapper.cc arrow_c.cc) add_library(cwrapper_obj OBJECT ${CWRAPPER_SRC_LIST}) # install header files diff --git a/cpp/src/cwrapper/arrow_c.cc b/cpp/src/cwrapper/arrow_c.cc new file mode 100644 index 000000000..e7f6c2de5 --- /dev/null +++ b/cpp/src/cwrapper/arrow_c.cc @@ -0,0 +1,761 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <cstring> +#include <ctime> +#include <type_traits> +#include <vector> + +#include "common/allocator/alloc_base.h" +#include "common/tsblock/tsblock.h" +#include "common/tsblock/tuple_desc.h" +#include "common/tsblock/vector/vector.h" +#include "cwrapper/tsfile_cwrapper.h" +#include "utils/errno_define.h" + +namespace arrow { + +#define ARROW_FLAG_DICTIONARY_ORDERED 1 +#define ARROW_FLAG_NULLABLE 2 +#define ARROW_FLAG_MAP_KEYS_SORTED 4 + +struct ArrowArrayData { + void** buffers; + size_t n_buffers; +}; + +struct ArrowSchemaData { + std::vector<std::string>* format_strings; + std::vector<std::string>* name_strings; + ArrowSchema** children; + size_t n_children; +}; + +struct StructArrayData { + ArrowArray** children; + size_t n_children; +}; + +static const char* GetArrowFormatString(common::TSDataType datatype) { + switch (datatype) { + case common::BOOLEAN: + return "b"; + case common::INT32: + return "i"; + case common::INT64: + return "l"; + case common::TIMESTAMP: // nanosecond, no timezone + return "tsn:"; + case common::FLOAT: + return "f"; + case common::DOUBLE: + return "g"; + case common::TEXT: + case common::STRING: + return "u"; + case common::DATE: + return "tdD"; // date32: days since Unix epoch, stored as int32 + default: + return nullptr; + } +} + +static inline size_t GetNullBitmapSize(int64_t length) { + return (length + 7) / 8; +} + +static void ReleaseArrowArray(ArrowArray* array) { + if (array == nullptr || array->private_data == nullptr) { + return; + } + ArrowArrayData* data = static_cast<ArrowArrayData*>(array->private_data); + if (data->buffers != nullptr) { + for (size_t i = 0; i < data->n_buffers; ++i) { + if (data->buffers[i] != nullptr) { + common::mem_free(data->buffers[i]); + } + } + common::mem_free(data->buffers); + } + common::mem_free(data); + + array->length = 0; + array->null_count = 0; + array->offset = 0; + array->n_buffers = 0; + array->n_children = 0; + array->buffers = nullptr; + array->children = nullptr; + array->dictionary = nullptr; + array->release = nullptr; + array->private_data = nullptr; +} + +static void ReleaseStructArrowArray(ArrowArray* array) { + if (array == nullptr || array->private_data == nullptr) { + return; + } + StructArrayData* data = static_cast<StructArrayData*>(array->private_data); + if (data->children != nullptr) { + for (size_t i = 0; i < data->n_children; ++i) { + if (data->children[i] != nullptr) { + if (data->children[i]->release != nullptr) { + data->children[i]->release(data->children[i]); + } + common::mem_free(data->children[i]); + } + } + common::mem_free(data->children); + } + delete data; + + array->length = 0; + array->null_count = 0; + array->offset = 0; + array->n_buffers = 0; + array->n_children = 0; + array->buffers = nullptr; + array->children = nullptr; + array->dictionary = nullptr; + array->release = nullptr; + array->private_data = nullptr; +} + +static void ReleaseArrowSchema(ArrowSchema* schema) { + if (schema == nullptr || schema->private_data == nullptr) { + return; + } + ArrowSchemaData* data = static_cast<ArrowSchemaData*>(schema->private_data); + + // Release children schemas first + if (data->children != nullptr) { + for (size_t i = 0; i < data->n_children; ++i) { + if (data->children[i] != nullptr) { + if (data->children[i]->release != nullptr) { + data->children[i]->release(data->children[i]); + } + common::mem_free(data->children[i]); + } + } + common::mem_free(data->children); + } + + // Release string storage + if (data->format_strings != nullptr) { + delete data->format_strings; + } + if (data->name_strings != nullptr) { + delete data->name_strings; + } + + delete data; + + schema->format = nullptr; + schema->name = nullptr; + schema->metadata = nullptr; + schema->flags = 0; + schema->n_children = 0; + schema->children = nullptr; + schema->dictionary = nullptr; + schema->release = nullptr; + schema->private_data = nullptr; +} + +template <typename CType> +inline int BuildFixedLengthArrowArrayC(common::Vector* vec, uint32_t row_count, + ArrowArray* out_array) { + if (vec == nullptr || out_array == nullptr || row_count == 0) { + return common::E_INVALID_ARG; + } + + bool has_null = vec->has_null(); + size_t type_size = sizeof(CType); + // Arrow C Data Interface: fixed-width types always have 2 buffers + // buffers[0] = validity bitmap (may be NULL if no nulls) + // buffers[1] = values + static constexpr int64_t n_buffers = 2; + + ArrowArrayData* array_data = static_cast<ArrowArrayData*>( + common::mem_alloc(sizeof(ArrowArrayData), common::MOD_TSBLOCK)); + if (array_data == nullptr) { + return common::E_OOM; + } + + array_data->n_buffers = n_buffers; + array_data->buffers = static_cast<void**>( + common::mem_alloc(n_buffers * sizeof(void*), common::MOD_TSBLOCK)); + if (array_data->buffers == nullptr) { + common::mem_free(array_data); + return common::E_OOM; + } + + for (int64_t i = 0; i < n_buffers; ++i) { + array_data->buffers[i] = nullptr; + } + + uint8_t* null_bitmap = nullptr; + if (has_null) { + size_t null_bitmap_size = GetNullBitmapSize(row_count); + null_bitmap = static_cast<uint8_t*>( + common::mem_alloc(null_bitmap_size, common::MOD_TSBLOCK)); + if (null_bitmap == nullptr) { + common::mem_free(array_data->buffers); + common::mem_free(array_data); + return common::E_OOM; + } + common::BitMap& vec_bitmap = vec->get_bitmap(); + char* vec_bitmap_data = vec_bitmap.get_bitmap(); + for (size_t i = 0; i < null_bitmap_size; ++i) { + null_bitmap[i] = ~static_cast<uint8_t>(vec_bitmap_data[i]); + } + array_data->buffers[0] = null_bitmap; + + int64_t null_count = 0; + for (uint32_t i = 0; i < row_count; ++i) { + if (vec_bitmap.test(i)) { + null_count++; + } + } + out_array->null_count = null_count; + } else { + array_data->buffers[0] = nullptr; + out_array->null_count = 0; + } + + char* vec_data = vec->get_value_data().get_data(); + void* data_buffer = nullptr; + + if (std::is_same<CType, bool>::value) { + size_t packed_size = GetNullBitmapSize(row_count); + uint8_t* packed_buffer = static_cast<uint8_t*>( + common::mem_alloc(packed_size, common::MOD_TSBLOCK)); + if (packed_buffer == nullptr) { + if (null_bitmap != nullptr) { + common::mem_free(null_bitmap); + } + common::mem_free(array_data->buffers); + common::mem_free(array_data); + return common::E_OOM; + } + + std::memset(packed_buffer, 0, packed_size); + + const uint8_t* src = reinterpret_cast<const uint8_t*>(vec_data); + for (uint32_t i = 0; i < row_count; ++i) { + if (src[i] != 0) { + uint32_t byte_idx = i / 8; + uint32_t bit_idx = i % 8; + packed_buffer[byte_idx] |= (1 << bit_idx); + } + } + + data_buffer = packed_buffer; + } else { + size_t data_size = type_size * row_count; + data_buffer = common::mem_alloc(data_size, common::MOD_TSBLOCK); + if (data_buffer == nullptr) { + if (null_bitmap != nullptr) { + common::mem_free(null_bitmap); + } + common::mem_free(array_data->buffers); + common::mem_free(array_data); + return common::E_OOM; + } + std::memcpy(data_buffer, vec_data, data_size); + } + + array_data->buffers[1] = data_buffer; + + out_array->length = row_count; + out_array->offset = 0; + out_array->n_buffers = n_buffers; + out_array->n_children = 0; + out_array->buffers = const_cast<const void**>(array_data->buffers); + out_array->children = nullptr; + out_array->dictionary = nullptr; + out_array->release = ReleaseArrowArray; + out_array->private_data = array_data; + + return common::E_OK; +} + +static int BuildStringArrowArrayC(common::Vector* vec, uint32_t row_count, + ArrowArray* out_array) { + if (vec == nullptr || out_array == nullptr || row_count == 0) { + return common::E_INVALID_ARG; + } + + bool has_null = vec->has_null(); + int64_t n_buffers = 3; + ArrowArrayData* array_data = static_cast<ArrowArrayData*>( + common::mem_alloc(sizeof(ArrowArrayData), common::MOD_TSBLOCK)); + if (array_data == nullptr) { + return common::E_OOM; + } + + array_data->n_buffers = n_buffers; + array_data->buffers = static_cast<void**>( + common::mem_alloc(n_buffers * sizeof(void*), common::MOD_TSBLOCK)); + if (array_data->buffers == nullptr) { + common::mem_free(array_data); + return common::E_OOM; + } + + for (int64_t i = 0; i < n_buffers; ++i) { + array_data->buffers[i] = nullptr; + } + + uint8_t* null_bitmap = nullptr; + if (has_null) { + size_t null_bitmap_size = GetNullBitmapSize(row_count); + null_bitmap = static_cast<uint8_t*>( + common::mem_alloc(null_bitmap_size, common::MOD_TSBLOCK)); + if (null_bitmap == nullptr) { + common::mem_free(array_data->buffers); + common::mem_free(array_data); + return common::E_OOM; + } + common::BitMap& vec_bitmap = vec->get_bitmap(); + char* vec_bitmap_data = vec_bitmap.get_bitmap(); + for (size_t i = 0; i < null_bitmap_size; ++i) { + null_bitmap[i] = ~static_cast<uint8_t>(vec_bitmap_data[i]); + } + array_data->buffers[0] = null_bitmap; + + int64_t null_count = 0; + for (uint32_t i = 0; i < row_count; ++i) { + if (vec_bitmap.test(i)) { + null_count++; + } + } + out_array->null_count = null_count; + } else { + array_data->buffers[0] = nullptr; + out_array->null_count = 0; + } + size_t offsets_size = sizeof(int32_t) * (row_count + 1); + int32_t* offsets = static_cast<int32_t*>( + common::mem_alloc(offsets_size, common::MOD_TSBLOCK)); + if (offsets == nullptr) { + if (null_bitmap != nullptr) { + common::mem_free(null_bitmap); + } + common::mem_free(array_data->buffers); + common::mem_free(array_data); + return common::E_OOM; + } + + offsets[0] = 0; + uint32_t current_offset = 0; + char* vec_data = vec->get_value_data().get_data(); + uint32_t vec_offset = 0; + + // 获取 vec_bitmap 用于后续检查 + common::BitMap& vec_bitmap = vec->get_bitmap(); + + for (uint32_t i = 0; i < row_count; ++i) { + if (has_null && vec_bitmap.test(i)) { + offsets[i + 1] = current_offset; + } else { + uint32_t len = 0; + std::memcpy(&len, vec_data + vec_offset, sizeof(uint32_t)); + vec_offset += sizeof(uint32_t); + + current_offset += len; + offsets[i + 1] = current_offset; + vec_offset += len; + } + } + + array_data->buffers[1] = offsets; + + size_t data_size = current_offset; + uint8_t* data_buffer = static_cast<uint8_t*>( + common::mem_alloc(data_size, common::MOD_TSBLOCK)); + if (data_buffer == nullptr) { + if (null_bitmap != nullptr) { + common::mem_free(null_bitmap); + } + common::mem_free(offsets); + common::mem_free(array_data->buffers); + common::mem_free(array_data); + return common::E_OOM; + } + + vec_offset = 0; + uint32_t data_offset = 0; + for (uint32_t i = 0; i < row_count; ++i) { + if (!has_null || !vec_bitmap.test(i)) { + uint32_t len = 0; + std::memcpy(&len, vec_data + vec_offset, sizeof(uint32_t)); + vec_offset += sizeof(uint32_t); + + if (len > 0) { + std::memcpy(data_buffer + data_offset, vec_data + vec_offset, + len); + data_offset += len; + } + vec_offset += len; + } + } + + array_data->buffers[2] = data_buffer; + + out_array->length = row_count; + out_array->offset = 0; + out_array->n_buffers = n_buffers; + out_array->n_children = 0; + out_array->buffers = const_cast<const void**>(array_data->buffers); + out_array->children = nullptr; + out_array->dictionary = nullptr; + out_array->release = ReleaseArrowArray; + out_array->private_data = array_data; + + return common::E_OK; +} + +// Convert TsFile YYYYMMDD integer to days since Unix epoch (1970-01-01) +static int32_t YYYYMMDDToDaysSinceEpoch(int32_t yyyymmdd) { + int year = yyyymmdd / 10000; + int month = (yyyymmdd % 10000) / 100; + int day = yyyymmdd % 100; + + std::tm date = {}; + date.tm_year = year - 1900; + date.tm_mon = month - 1; + date.tm_mday = day; + date.tm_hour = 12; + date.tm_isdst = -1; + + std::tm epoch = {}; + epoch.tm_year = 70; + epoch.tm_mon = 0; + epoch.tm_mday = 1; + epoch.tm_hour = 12; + epoch.tm_isdst = -1; + + time_t t1 = mktime(&date); + time_t t2 = mktime(&epoch); + return static_cast<int32_t>((t1 - t2) / (60 * 60 * 24)); +} + +static int BuildDateArrowArrayC(common::Vector* vec, uint32_t row_count, + ArrowArray* out_array) { + if (vec == nullptr || out_array == nullptr || row_count == 0) { + return common::E_INVALID_ARG; + } + + bool has_null = vec->has_null(); + static constexpr int64_t n_buffers = 2; + + ArrowArrayData* array_data = static_cast<ArrowArrayData*>( + common::mem_alloc(sizeof(ArrowArrayData), common::MOD_TSBLOCK)); + if (array_data == nullptr) return common::E_OOM; + + array_data->n_buffers = n_buffers; + array_data->buffers = static_cast<void**>( + common::mem_alloc(n_buffers * sizeof(void*), common::MOD_TSBLOCK)); + if (array_data->buffers == nullptr) { + common::mem_free(array_data); + return common::E_OOM; + } + for (int64_t i = 0; i < n_buffers; ++i) array_data->buffers[i] = nullptr; + + common::BitMap& vec_bitmap = vec->get_bitmap(); + uint8_t* null_bitmap = nullptr; + if (has_null) { + size_t null_bitmap_size = GetNullBitmapSize(row_count); + null_bitmap = static_cast<uint8_t*>( + common::mem_alloc(null_bitmap_size, common::MOD_TSBLOCK)); + if (null_bitmap == nullptr) { + common::mem_free(array_data->buffers); + common::mem_free(array_data); + return common::E_OOM; + } + char* vec_bitmap_data = vec_bitmap.get_bitmap(); + for (size_t i = 0; i < null_bitmap_size; ++i) { + null_bitmap[i] = ~static_cast<uint8_t>(vec_bitmap_data[i]); + } + int64_t null_count = 0; + for (uint32_t i = 0; i < row_count; ++i) { + if (vec_bitmap.test(i)) null_count++; + } + out_array->null_count = null_count; + array_data->buffers[0] = null_bitmap; + } else { + out_array->null_count = 0; + array_data->buffers[0] = nullptr; + } + + int32_t* data_buffer = static_cast<int32_t*>( + common::mem_alloc(sizeof(int32_t) * row_count, common::MOD_TSBLOCK)); + if (data_buffer == nullptr) { + if (null_bitmap) common::mem_free(null_bitmap); + common::mem_free(array_data->buffers); + common::mem_free(array_data); + return common::E_OOM; + } + + char* vec_data = vec->get_value_data().get_data(); + for (uint32_t i = 0; i < row_count; ++i) { + if (has_null && vec_bitmap.test(i)) { + data_buffer[i] = 0; + } else { + int32_t yyyymmdd = 0; + std::memcpy(&yyyymmdd, vec_data + i * sizeof(int32_t), + sizeof(int32_t)); + data_buffer[i] = YYYYMMDDToDaysSinceEpoch(yyyymmdd); + } + } + + array_data->buffers[1] = data_buffer; + out_array->length = row_count; + out_array->offset = 0; + out_array->n_buffers = n_buffers; + out_array->n_children = 0; + out_array->buffers = const_cast<const void**>(array_data->buffers); + out_array->children = nullptr; + out_array->dictionary = nullptr; + out_array->release = ReleaseArrowArray; + out_array->private_data = array_data; + return common::E_OK; +} + +// Helper function to build ArrowArray for a single column +static int BuildColumnArrowArray(common::Vector* vec, uint32_t row_count, + ArrowArray* out_array) { + if (vec == nullptr || out_array == nullptr || row_count == 0) { + return common::E_INVALID_ARG; + } + + common::TSDataType data_type = vec->get_vector_type(); + const char* format = GetArrowFormatString(data_type); + if (format == nullptr) { + return common::E_TYPE_NOT_SUPPORTED; + } + + int ret = common::E_OK; + switch (data_type) { + case common::BOOLEAN: + ret = BuildFixedLengthArrowArrayC<bool>(vec, row_count, out_array); + break; + case common::INT32: + ret = + BuildFixedLengthArrowArrayC<int32_t>(vec, row_count, out_array); + break; + case common::DATE: + ret = BuildDateArrowArrayC(vec, row_count, out_array); + break; + case common::INT64: + case common::TIMESTAMP: + ret = + BuildFixedLengthArrowArrayC<int64_t>(vec, row_count, out_array); + break; + case common::FLOAT: + ret = BuildFixedLengthArrowArrayC<float>(vec, row_count, out_array); + break; + case common::DOUBLE: + ret = + BuildFixedLengthArrowArrayC<double>(vec, row_count, out_array); + break; + case common::TEXT: + case common::STRING: + ret = BuildStringArrowArrayC(vec, row_count, out_array); + break; + default: + return common::E_TYPE_NOT_SUPPORTED; + } + return ret; +} + +// Build ArrowSchema for a single column +static int BuildColumnArrowSchema(common::TSDataType data_type, + const std::string& column_name, + ArrowSchema* out_schema) { + if (out_schema == nullptr) { + return common::E_INVALID_ARG; + } + + const char* format = GetArrowFormatString(data_type); + if (format == nullptr) { + return common::E_TYPE_NOT_SUPPORTED; + } + + ArrowSchemaData* schema_data = new ArrowSchemaData(); + schema_data->format_strings = new std::vector<std::string>(); + schema_data->name_strings = new std::vector<std::string>(); + schema_data->children = nullptr; + schema_data->n_children = 0; + + schema_data->format_strings->push_back(format); + schema_data->name_strings->push_back(column_name); + + out_schema->format = schema_data->format_strings->back().c_str(); + out_schema->name = schema_data->name_strings->back().c_str(); + out_schema->metadata = nullptr; + out_schema->flags = ARROW_FLAG_NULLABLE; + out_schema->n_children = 0; + out_schema->children = nullptr; + out_schema->dictionary = nullptr; + out_schema->release = ReleaseArrowSchema; + out_schema->private_data = schema_data; + + return common::E_OK; +} + +int TsBlockToArrowStruct(common::TsBlock& tsblock, ArrowArray* out_array, + ArrowSchema* out_schema) { + if (out_array == nullptr || out_schema == nullptr) { + return common::E_INVALID_ARG; + } + + uint32_t row_count = tsblock.get_row_count(); + uint32_t column_count = tsblock.get_column_count(); + common::TupleDesc* tuple_desc = tsblock.get_tuple_desc(); + + if (row_count == 0 || column_count == 0) { + return common::E_INVALID_ARG; + } + + // Build ArrowSchema for struct type + ArrowSchemaData* schema_data = new ArrowSchemaData(); + schema_data->format_strings = new std::vector<std::string>(); + schema_data->name_strings = new std::vector<std::string>(); + schema_data->n_children = column_count; + schema_data->children = static_cast<ArrowSchema**>(common::mem_alloc( + column_count * sizeof(ArrowSchema*), common::MOD_TSBLOCK)); + if (schema_data->children == nullptr) { + delete schema_data->format_strings; + delete schema_data->name_strings; + delete schema_data; + return common::E_OOM; + } + + // Store format string for struct type + schema_data->format_strings->push_back("+s"); + schema_data->name_strings->push_back(""); + + // Build schema for each column + for (uint32_t i = 0; i < column_count; ++i) { + schema_data->children[i] = static_cast<ArrowSchema*>( + common::mem_alloc(sizeof(ArrowSchema), common::MOD_TSBLOCK)); + if (schema_data->children[i] == nullptr) { + for (uint32_t j = 0; j < i; ++j) { + if (schema_data->children[j] != nullptr && + schema_data->children[j]->release != nullptr) { + schema_data->children[j]->release(schema_data->children[j]); + } + } + common::mem_free(schema_data->children); + delete schema_data->format_strings; + delete schema_data->name_strings; + delete schema_data; + return common::E_OOM; + } + + common::TSDataType col_type = tuple_desc->get_column_type(i); + std::string col_name = tuple_desc->get_column_name(i); + + int ret = BuildColumnArrowSchema(col_type, col_name, + schema_data->children[i]); + if (ret != common::E_OK) { + for (uint32_t j = 0; j <= i; ++j) { + if (schema_data->children[j] != nullptr && + schema_data->children[j]->release != nullptr) { + schema_data->children[j]->release(schema_data->children[j]); + } + } + common::mem_free(schema_data->children); + delete schema_data->format_strings; + delete schema_data->name_strings; + delete schema_data; + return ret; + } + } + + out_schema->format = schema_data->format_strings->at(0).c_str(); + out_schema->name = schema_data->name_strings->at(0).c_str(); + out_schema->metadata = nullptr; + out_schema->flags = 0; + out_schema->n_children = column_count; + out_schema->children = schema_data->children; + out_schema->dictionary = nullptr; + out_schema->release = ReleaseArrowSchema; + out_schema->private_data = schema_data; + + ArrowArray** children_arrays = static_cast<ArrowArray**>(common::mem_alloc( + column_count * sizeof(ArrowArray*), common::MOD_TSBLOCK)); + if (children_arrays == nullptr) { + ReleaseArrowSchema(out_schema); + return common::E_OOM; + } + + for (uint32_t i = 0; i < column_count; ++i) { + children_arrays[i] = static_cast<ArrowArray*>( + common::mem_alloc(sizeof(ArrowArray), common::MOD_TSBLOCK)); + if (children_arrays[i] == nullptr) { + for (uint32_t j = 0; j < i; ++j) { + if (children_arrays[j] != nullptr && + children_arrays[j]->release != nullptr) { + children_arrays[j]->release(children_arrays[j]); + } + } + common::mem_free(children_arrays); + ReleaseArrowSchema(out_schema); + return common::E_OOM; + } + + common::Vector* vec = tsblock.get_vector(i); + int ret = BuildColumnArrowArray(vec, row_count, children_arrays[i]); + if (ret != common::E_OK) { + for (uint32_t j = 0; j <= i; ++j) { + if (children_arrays[j] != nullptr && + children_arrays[j]->release != nullptr) { + children_arrays[j]->release(children_arrays[j]); + } + } + common::mem_free(children_arrays); + ReleaseArrowSchema(out_schema); + return ret; + } + } + + StructArrayData* struct_data = new StructArrayData(); + struct_data->children = children_arrays; + struct_data->n_children = column_count; + + // Arrow C Data Interface: struct type requires n_buffers = 1 (validity + // bitmap) buffers[0] may be NULL if there are no nulls at the struct level + static const void* struct_buffers[1] = {nullptr}; + + out_array->length = row_count; + out_array->null_count = 0; // struct itself is never null + out_array->offset = 0; + out_array->n_buffers = 1; + out_array->n_children = column_count; + out_array->buffers = struct_buffers; + out_array->children = children_arrays; + out_array->dictionary = nullptr; + out_array->release = ReleaseStructArrowArray; + out_array->private_data = struct_data; + + return common::E_OK; +} + +} // namespace arrow diff --git a/cpp/src/cwrapper/tsfile_cwrapper.cc b/cpp/src/cwrapper/tsfile_cwrapper.cc index fbcf4e6f1..298f27f0a 100644 --- a/cpp/src/cwrapper/tsfile_cwrapper.cc +++ b/cpp/src/cwrapper/tsfile_cwrapper.cc @@ -24,13 +24,21 @@ #include <unistd.h> #include <writer/tsfile_table_writer.h> +#include <cstring> #include <set> #include "common/tablet.h" #include "reader/result_set.h" +#include "reader/table_result_set.h" #include "reader/tsfile_reader.h" #include "writer/tsfile_writer.h" +// Forward declaration for arrow namespace function (defined in arrow_c.cc) +namespace arrow { +int TsBlockToArrowStruct(common::TsBlock& tsblock, ArrowArray* out_array, + ArrowSchema* out_schema); +} + #ifdef __cplusplus extern "C" { #endif @@ -361,6 +369,21 @@ ResultSet tsfile_query_table_on_tree(TsFileReader reader, char** columns, return table_result_set; } +ResultSet tsfile_query_table_batch(TsFileReader reader, const char* table_name, + char** columns, uint32_t column_num, + Timestamp start_time, Timestamp end_time, + int batch_size, ERRNO* err_code) { + auto* r = static_cast<storage::TsFileReader*>(reader); + storage::ResultSet* table_result_set = nullptr; + std::vector<std::string> column_names; + for (uint32_t i = 0; i < column_num; i++) { + column_names.emplace_back(columns[i]); + } + *err_code = r->query(table_name, column_names, start_time, end_time, + table_result_set, batch_size); + return table_result_set; +} + bool tsfile_result_set_next(ResultSet result_set, ERRNO* err_code) { auto* r = static_cast<storage::ResultSet*>(result_set); bool has_next = true; @@ -373,6 +396,34 @@ bool tsfile_result_set_next(ResultSet result_set, ERRNO* err_code) { return has_next; } +ERRNO tsfile_result_set_get_next_tsblock_as_arrow(ResultSet result_set, + ArrowArray* out_array, + ArrowSchema* out_schema) { + if (result_set == nullptr || out_array == nullptr || + out_schema == nullptr) { + return common::E_INVALID_ARG; + } + + auto* r = static_cast<storage::ResultSet*>(result_set); + auto* table_result_set = dynamic_cast<storage::TableResultSet*>(r); + if (table_result_set == nullptr) { + return common::E_INVALID_ARG; + } + + common::TsBlock* tsblock = nullptr; + int ret = table_result_set->get_next_tsblock(tsblock); + if (ret != common::E_OK) { + return ret; + } + + if (tsblock == nullptr) { + return common::E_NO_MORE_DATA; + } + + ret = arrow::TsBlockToArrowStruct(*tsblock, out_array, out_schema); + return ret; +} + #define TSFILE_RESULT_SET_GET_VALUE_BY_NAME_DEF(type) \ type tsfile_result_set_get_value_by_name_##type(ResultSet result_set, \ const char* column_name) { \ diff --git a/cpp/src/cwrapper/tsfile_cwrapper.h b/cpp/src/cwrapper/tsfile_cwrapper.h index 643b4e52b..b04e32c26 100644 --- a/cpp/src/cwrapper/tsfile_cwrapper.h +++ b/cpp/src/cwrapper/tsfile_cwrapper.h @@ -20,6 +20,7 @@ #ifndef SRC_CWRAPPER_TSFILE_CWRAPPER_H_ #define SRC_CWRAPPER_TSFILE_CWRAPPER_H_ #ifdef __cplusplus + extern "C" { #endif @@ -124,6 +125,39 @@ typedef void* TsRecord; typedef void* ResultSet; +typedef struct arrow_schema { + // Array type description + const char* format; + const char* name; + const char* metadata; + int64_t flags; + int64_t n_children; + struct arrow_schema** children; + struct arrow_schema* dictionary; + + // Release callback + void (*release)(struct arrow_schema*); + // Opaque producer-specific data + void* private_data; +} ArrowSchema; + +typedef struct arrow_array { + // Array data description + int64_t length; + int64_t null_count; + int64_t offset; + int64_t n_buffers; + int64_t n_children; + const void** buffers; + struct arrow_array** children; + struct arrow_array* dictionary; + + // Release callback + void (*release)(struct arrow_array*); + // Opaque producer-specific data + void* private_data; +} ArrowArray; + typedef int32_t ERRNO; typedef int64_t Timestamp; @@ -444,11 +478,15 @@ ResultSet tsfile_query_table(TsFileReader reader, const char* table_name, ResultSet tsfile_query_table_on_tree(TsFileReader reader, char** columns, uint32_t column_num, Timestamp start_time, Timestamp end_time, ERRNO* err_code); +ResultSet tsfile_query_table_batch(TsFileReader reader, const char* table_name, + char** columns, uint32_t column_num, + Timestamp start_time, Timestamp end_time, + int batch_size, ERRNO* err_code); // ResultSet tsfile_reader_query_device(TsFileReader reader, // const char* device_name, -// char** sensor_name, uint32_t sensor_num, -// Timestamp start_time, Timestamp -// end_time); +// char** sensor_name, uint32_t +// sensor_num, Timestamp start_time, +// Timestamp end_time); /** * @brief Check and fetch the next row in the ResultSet. @@ -458,6 +496,27 @@ ResultSet tsfile_query_table_on_tree(TsFileReader reader, char** columns, */ bool tsfile_result_set_next(ResultSet result_set, ERRNO* error_code); +/** + * @brief Gets the next TsBlock from batch ResultSet and converts it to Arrow + * format. + * + * @param result_set [in] Valid ResultSet handle from batch query + * (tsfile_query_table_batch). + * @param out_array [out] Pointer to ArrowArray pointer. Will be set to the + * converted Arrow array. + * @param out_schema [out] Pointer to ArrowSchema pointer. Will be set to the + * converted Arrow schema. + * @return ERRNO - E_OK(0) on success, E_NO_MORE_DATA if no more blocks, or + * other error codes. + * @note Caller should release ArrowArray and ArrowSchema by calling their + * release callbacks when done. + * @note This function should only be called on ResultSet obtained from + * tsfile_query_table_batch with batch_size > 0. + */ +ERRNO tsfile_result_set_get_next_tsblock_as_arrow(ResultSet result_set, + ArrowArray* out_array, + ArrowSchema* out_schema); + /** * @brief Gets value from current row by column name (generic types). * diff --git a/cpp/src/reader/qds_with_timegenerator.h b/cpp/src/reader/qds_with_timegenerator.h index 52892df14..c0651f0b1 100644 --- a/cpp/src/reader/qds_with_timegenerator.h +++ b/cpp/src/reader/qds_with_timegenerator.h @@ -123,6 +123,7 @@ class QDSWithTimeGenerator : public ResultSet { bool is_null(uint32_t column_index); RowRecord* get_row_record(); std::shared_ptr<ResultSetMetadata> get_metadata(); + int get_next_tsblock(common::TsBlock*& block) { return common::E_OK; }; private: int construct_node_tree(Expression* expr, Node*& node); diff --git a/cpp/src/reader/qds_without_timegenerator.h b/cpp/src/reader/qds_without_timegenerator.h index 0619fa673..f949e04b5 100644 --- a/cpp/src/reader/qds_without_timegenerator.h +++ b/cpp/src/reader/qds_without_timegenerator.h @@ -48,6 +48,7 @@ class QDSWithoutTimeGenerator : public ResultSet { bool is_null(uint32_t column_index); RowRecord* get_row_record(); std::shared_ptr<ResultSetMetadata> get_metadata(); + int get_next_tsblock(common::TsBlock*& block) { return common::E_OK; }; private: int get_next_tsblock(uint32_t index, bool alloc_mem); diff --git a/cpp/src/reader/result_set.h b/cpp/src/reader/result_set.h index 87303cef4..26f3a3fa9 100644 --- a/cpp/src/reader/result_set.h +++ b/cpp/src/reader/result_set.h @@ -25,6 +25,7 @@ #include <unordered_map> #include "common/row_record.h" +#include "common/tsblock/tsblock.h" namespace storage { /** @@ -155,6 +156,9 @@ class ResultSet : std::enable_shared_from_this<ResultSet> { ASSERT(column_index >= 0 && column_index < row_record->get_col_num()); return row_record->get_field(column_index)->get_value<T>(); } + + virtual int get_next_tsblock(common::TsBlock*& block) = 0; + /** * @brief Get the row record of the result set * diff --git a/cpp/src/reader/table_query_executor.h b/cpp/src/reader/table_query_executor.h index 974e6b45b..76427d7e8 100644 --- a/cpp/src/reader/table_query_executor.h +++ b/cpp/src/reader/table_query_executor.h @@ -44,13 +44,20 @@ class TableQueryExecutor { : meta_data_querier_(meta_data_querier), tsfile_io_reader_(tsfile_io_reader), table_query_ordering_(table_query_ordering), - block_size_(block_size) {} - TableQueryExecutor(ReadFile* read_file) { + block_size_(block_size), + batch_mode_(false) {} + TableQueryExecutor(ReadFile* read_file, const int batch_size = 0) { tsfile_io_reader_ = new TsFileIOReader(); tsfile_io_reader_->init(read_file); meta_data_querier_ = new MetadataQuerier(tsfile_io_reader_); table_query_ordering_ = TableQueryOrdering::DEVICE; - block_size_ = 1024; + if (batch_size == 0) { + block_size_ = 1024; + batch_mode_ = false; + } else { + block_size_ = batch_size; + batch_mode_ = true; + } } ~TableQueryExecutor() { if (meta_data_querier_ != nullptr) { @@ -76,6 +83,7 @@ class TableQueryExecutor { TsFileIOReader* tsfile_io_reader_; TableQueryOrdering table_query_ordering_; int32_t block_size_; + bool batch_mode_; }; } // namespace storage diff --git a/cpp/src/reader/table_result_set.cc b/cpp/src/reader/table_result_set.cc index aeeefb463..9a4281f12 100644 --- a/cpp/src/reader/table_result_set.cc +++ b/cpp/src/reader/table_result_set.cc @@ -37,7 +37,12 @@ void TableResultSet::init() { TableResultSet::~TableResultSet() { close(); } int TableResultSet::next(bool& has_next) { + if (batch_mode_) { + return tsblock_reader_->has_next(has_next); + } + int ret = common::E_OK; + while (row_iterator_ == nullptr || !row_iterator_->has_next()) { if (RET_FAIL(tsblock_reader_->has_next(has_next))) { return ret; @@ -103,6 +108,35 @@ std::shared_ptr<ResultSetMetadata> TableResultSet::get_metadata() { return result_set_metadata_; } +int TableResultSet::get_next_tsblock(common::TsBlock*& block) { + int ret = common::E_OK; + block = nullptr; + + if (!batch_mode_) { + return common::E_INVALID_ARG; + } + + bool has_next = false; + if (RET_FAIL(tsblock_reader_->has_next(has_next))) { + return common::E_NO_MORE_DATA; + } + + if (!has_next) { + return common::E_NO_MORE_DATA; + } + + if (RET_FAIL(tsblock_reader_->next(tsblock_))) { + return common::E_NO_MORE_DATA; + } + + if (tsblock_ == nullptr) { + return common::E_NO_MORE_DATA; + } + + block = tsblock_; + return common::E_OK; +} + void TableResultSet::close() { tsblock_reader_->close(); pa_.destroy(); diff --git a/cpp/src/reader/table_result_set.h b/cpp/src/reader/table_result_set.h index 4192f7c2f..251c8029c 100644 --- a/cpp/src/reader/table_result_set.h +++ b/cpp/src/reader/table_result_set.h @@ -28,19 +28,22 @@ class TableResultSet : public ResultSet { public: explicit TableResultSet(std::unique_ptr<TsBlockReader> tsblock_reader, std::vector<std::string> column_names, - std::vector<common::TSDataType> data_types) + std::vector<common::TSDataType> data_types, + bool batch_mode = false) : tsblock_reader_(std::move(tsblock_reader)), column_names_(column_names), - data_types_(data_types) { + data_types_(data_types), + batch_mode_(batch_mode) { init(); } - ~TableResultSet(); + ~TableResultSet() override; int next(bool& has_next) override; bool is_null(const std::string& column_name) override; bool is_null(uint32_t column_index) override; RowRecord* get_row_record() override; std::shared_ptr<ResultSetMetadata> get_metadata() override; void close() override; + int get_next_tsblock(common::TsBlock*& block) override; private: void init(); @@ -51,6 +54,7 @@ class TableResultSet : public ResultSet { std::vector<std::unique_ptr<TsBlockReader>> tsblock_readers_; std::vector<std::string> column_names_; std::vector<common::TSDataType> data_types_; + const bool batch_mode_; }; } // namespace storage #endif // TABLE_RESULT_SET_H \ No newline at end of file diff --git a/cpp/src/reader/tsfile_reader.cc b/cpp/src/reader/tsfile_reader.cc index f97570885..436f262e8 100644 --- a/cpp/src/reader/tsfile_reader.cc +++ b/cpp/src/reader/tsfile_reader.cc @@ -42,7 +42,6 @@ int TsFileReader::open(const std::string& file_path) { } else if (RET_FAIL(tsfile_executor_->init(read_file_))) { std::cout << "filed to init " << ret << std::endl; } - table_query_executor_ = new storage::TableQueryExecutor(read_file_); return ret; } @@ -87,15 +86,16 @@ int TsFileReader::query(std::vector<std::string>& path_list, int64_t start_time, int TsFileReader::query(const std::string& table_name, const std::vector<std::string>& columns_names, int64_t start_time, int64_t end_time, - ResultSet*& result_set) { + ResultSet*& result_set, int batch_size) { return this->query(table_name, columns_names, start_time, end_time, - result_set, nullptr); + result_set, nullptr, batch_size); } int TsFileReader::query(const std::string& table_name, const std::vector<std::string>& columns_names, int64_t start_time, int64_t end_time, - ResultSet*& result_set, Filter* tag_filter) { + ResultSet*& result_set, Filter* tag_filter, + int batch_size) { int ret = E_OK; TsFileMeta* tsfile_meta = tsfile_executor_->get_tsfile_meta(); if (tsfile_meta == nullptr) { @@ -108,6 +108,9 @@ int TsFileReader::query(const std::string& table_name, } Filter* time_filter = new TimeBetween(start_time, end_time, false); + if (table_query_executor_ == nullptr) { + table_query_executor_ = new TableQueryExecutor(read_file_, batch_size); + } ret = table_query_executor_->query(to_lower(table_name), columns_names, time_filter, tag_filter, nullptr, result_set); @@ -185,6 +188,9 @@ int TsFileReader::query_table_on_tree( columns_names[i] = "col_" + std::to_string(i); } Filter* time_filter = new TimeBetween(star_time, end_time, false); + if (table_query_executor_ == nullptr) { + table_query_executor_ = new TableQueryExecutor(read_file_); + } ret = table_query_executor_->query_on_tree( satisfied_device_ids, columns_names, measurement_names_to_query, time_filter, result_set); diff --git a/cpp/src/reader/tsfile_reader.h b/cpp/src/reader/tsfile_reader.h index 8a6ba2264..526a96351 100644 --- a/cpp/src/reader/tsfile_reader.h +++ b/cpp/src/reader/tsfile_reader.h @@ -95,7 +95,7 @@ class TsFileReader { */ int query(const std::string& table_name, const std::vector<std::string>& columns_names, int64_t start_time, - int64_t end_time, ResultSet*& result_set); + int64_t end_time, ResultSet*& result_set, int batch_size = 0); /** * @brief query the tsfile by the table name, columns names, start time @@ -111,7 +111,8 @@ class TsFileReader { */ int query(const std::string& table_name, const std::vector<std::string>& columns_names, int64_t start_time, - int64_t end_time, ResultSet*& result_set, Filter* tag_filter); + int64_t end_time, ResultSet*& result_set, Filter* tag_filter, + int batch_size = 0); int query_table_on_tree(const std::vector<std::string>& measurement_names, int64_t star_time, int64_t end_time, diff --git a/cpp/test/common/tsblock/arrow_tsblock_test.cc b/cpp/test/common/tsblock/arrow_tsblock_test.cc new file mode 100644 index 000000000..123efb59f --- /dev/null +++ b/cpp/test/common/tsblock/arrow_tsblock_test.cc @@ -0,0 +1,334 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include <gtest/gtest.h> + +#include <cstring> + +#include "common/tsblock/tsblock.h" +#include "cwrapper/tsfile_cwrapper.h" +#include "utils/db_utils.h" + +// Forward declarations for arrow namespace (functions are defined in +// arrow_c.cc) +namespace arrow { +// Type aliases for Arrow types (defined in tsfile_cwrapper.h) +using ArrowArray = ::ArrowArray; +using ArrowSchema = ::ArrowSchema; +#define ARROW_FLAG_DICTIONARY_ORDERED 1 +#define ARROW_FLAG_NULLABLE 2 +#define ARROW_FLAG_MAP_KEYS_SORTED 4 + +// Function declaration (defined in arrow_c.cc) +int TsBlockToArrowStruct(common::TsBlock& tsblock, ArrowArray* out_array, + ArrowSchema* out_schema); +} // namespace arrow + +static void VerifyArrowSchema( + const ::arrow::ArrowSchema* schema, + const std::vector<std::string>& expected_names, + const std::vector<const char*>& expected_formats) { + ASSERT_NE(schema, nullptr); + EXPECT_STREQ(schema->format, "+s"); + EXPECT_EQ(schema->n_children, expected_names.size()); + ASSERT_NE(schema->children, nullptr); + + for (size_t i = 0; i < expected_names.size(); ++i) { + const arrow::ArrowSchema* child = schema->children[i]; + ASSERT_NE(child, nullptr); + EXPECT_STREQ(child->name, expected_names[i].c_str()); + EXPECT_STREQ(child->format, expected_formats[i]); + EXPECT_EQ(child->flags, ARROW_FLAG_NULLABLE); + } +} + +static void VerifyArrowArrayData(const arrow::ArrowArray* array, + uint32_t expected_length) { + ASSERT_NE(array, nullptr); + EXPECT_EQ(array->length, expected_length); + EXPECT_EQ(array->n_children, 3); + ASSERT_NE(array->children, nullptr); +} + +TEST(ArrowTsBlockTest, NormalTsBlock_NoNulls) { + common::TupleDesc tuple_desc; + common::ColumnSchema col1("int_col", common::INT32, common::SNAPPY, + common::RLE); + common::ColumnSchema col2("double_col", common::DOUBLE, common::SNAPPY, + common::RLE); + common::ColumnSchema col3("string_col", common::STRING, common::SNAPPY, + common::RLE); + tuple_desc.push_back(col1); + tuple_desc.push_back(col2); + tuple_desc.push_back(col3); + + common::TsBlock tsblock(&tuple_desc, 10); + ASSERT_EQ(tsblock.init(), common::E_OK); + + common::RowAppender row_appender(&tsblock); + + for (int i = 0; i < 5; ++i) { + ASSERT_TRUE(row_appender.add_row()); + + int32_t int_val = 100 + i; + row_appender.append(0, reinterpret_cast<const char*>(&int_val), + sizeof(int32_t)); + double double_val = 3.14 + i; + row_appender.append(1, reinterpret_cast<const char*>(&double_val), + sizeof(double)); + std::string str_val = "test" + std::to_string(i); + row_appender.append(2, str_val.c_str(), str_val.length()); + } + + EXPECT_EQ(tsblock.get_row_count(), 5); + + arrow::ArrowArray array; + arrow::ArrowSchema schema; + int ret = arrow::TsBlockToArrowStruct(tsblock, &array, &schema); + ASSERT_EQ(ret, common::E_OK); + + std::vector<std::string> expected_names = {"int_col", "double_col", + "string_col"}; + std::vector<const char*> expected_formats = {"i", "g", "u"}; + VerifyArrowSchema(&schema, expected_names, expected_formats); + + VerifyArrowArrayData(&array, 5); + + ASSERT_NE(array.children, nullptr); + ASSERT_NE(array.children[0], nullptr); + ASSERT_NE(array.children[1], nullptr); + ASSERT_NE(array.children[2], nullptr); + + const ArrowArray* int_array = array.children[0]; + EXPECT_EQ(int_array->length, 5); + EXPECT_EQ(int_array->null_count, 0); + ASSERT_NE(int_array->buffers, nullptr); + const int32_t* int_data = reinterpret_cast<const int32_t*>( + int_array->buffers[int_array->n_buffers - 1]); + for (int i = 0; i < 5; ++i) { + EXPECT_EQ(int_data[i], 100 + i); + } + + const arrow::ArrowArray* double_array = array.children[1]; + EXPECT_EQ(double_array->length, 5); + EXPECT_EQ(double_array->null_count, 0); + const double* double_data = reinterpret_cast<const double*>( + double_array->buffers[double_array->n_buffers - 1]); + for (int i = 0; i < 5; ++i) { + EXPECT_DOUBLE_EQ(double_data[i], 3.14 + i); + } + const arrow::ArrowArray* string_array = array.children[2]; + EXPECT_EQ(string_array->length, 5); + EXPECT_EQ(string_array->null_count, 0); + ASSERT_NE(string_array->buffers, nullptr); + const int32_t* offsets = + reinterpret_cast<const int32_t*>(string_array->buffers[1]); + const char* string_data = + reinterpret_cast<const char*>(string_array->buffers[2]); + + for (int i = 0; i < 5; ++i) { + int32_t start = offsets[i]; + int32_t end = offsets[i + 1]; + std::string expected_str = "test" + std::to_string(i); + std::string actual_str(string_data + start, end - start); + EXPECT_EQ(actual_str, expected_str); + } + + if (array.release != nullptr) { + array.release(&array); + } + if (schema.release != nullptr) { + schema.release(&schema); + } +} + +TEST(ArrowTsBlockTest, TsBlock_WithNulls) { + common::TupleDesc tuple_desc; + common::ColumnSchema col1("int_col", common::INT32, common::SNAPPY, + common::RLE); + common::ColumnSchema col2("double_col", common::DOUBLE, common::SNAPPY, + common::RLE); + common::ColumnSchema col3("string_col", common::STRING, common::SNAPPY, + common::RLE); + tuple_desc.push_back(col1); + tuple_desc.push_back(col2); + tuple_desc.push_back(col3); + + common::TsBlock tsblock(&tuple_desc, 10); + ASSERT_EQ(tsblock.init(), common::E_OK); + + common::RowAppender row_appender(&tsblock); + for (int i = 0; i < 5; ++i) { + ASSERT_TRUE(row_appender.add_row()); + + if (i == 1) { + row_appender.append_null(0); + row_appender.append_null(1); + row_appender.append_null(2); + } else if (i == 3) { + row_appender.append_null(0); + double double_val = 3.14 + i; + row_appender.append(1, reinterpret_cast<const char*>(&double_val), + sizeof(double)); + std::string str_val = "test" + std::to_string(i); + row_appender.append(2, str_val.c_str(), str_val.length()); + } else { + int32_t int_val = 100 + i; + row_appender.append(0, reinterpret_cast<const char*>(&int_val), + sizeof(int32_t)); + double double_val = 3.14 + i; + row_appender.append(1, reinterpret_cast<const char*>(&double_val), + sizeof(double)); + std::string str_val = "test" + std::to_string(i); + row_appender.append(2, str_val.c_str(), str_val.length()); + } + } + + EXPECT_EQ(tsblock.get_row_count(), 5); + + arrow::ArrowArray array; + arrow::ArrowSchema schema; + int ret = arrow::TsBlockToArrowStruct(tsblock, &array, &schema); + ASSERT_EQ(ret, common::E_OK); + + std::vector<std::string> expected_names = {"int_col", "double_col", + "string_col"}; + std::vector<const char*> expected_formats = {"i", "g", "u"}; + VerifyArrowSchema(&schema, expected_names, expected_formats); + + VerifyArrowArrayData(&array, 5); + + const arrow::ArrowArray* int_array = array.children[0]; + EXPECT_EQ(int_array->null_count, 2); + + const arrow::ArrowArray* double_array = array.children[1]; + EXPECT_EQ(double_array->null_count, 1); + + const arrow::ArrowArray* string_array = array.children[2]; + EXPECT_EQ(string_array->null_count, 1); + + ASSERT_NE(int_array->buffers[0], nullptr); + const uint8_t* null_bitmap = + reinterpret_cast<const uint8_t*>(int_array->buffers[0]); + EXPECT_FALSE(null_bitmap[0] & (1 << 1)); + EXPECT_FALSE(null_bitmap[0] & (1 << 3)); + EXPECT_TRUE(null_bitmap[0] & (1 << 0)); + EXPECT_TRUE(null_bitmap[0] & (1 << 2)); + EXPECT_TRUE(null_bitmap[0] & (1 << 4)); + + const int32_t* int_data = reinterpret_cast<const int32_t*>( + int_array->buffers[int_array->n_buffers - 1]); + EXPECT_NE(int_data, nullptr); + if (array.release != nullptr) { + array.release(&array); + } + if (schema.release != nullptr) { + schema.release(&schema); + } +} + +TEST(ArrowTsBlockTest, TsBlock_EdgeCases) { + { + common::TupleDesc tuple_desc; + common::ColumnSchema col1("single_col", common::INT64, common::SNAPPY, + common::RLE); + tuple_desc.push_back(col1); + + common::TsBlock tsblock(&tuple_desc, 5); + ASSERT_EQ(tsblock.init(), common::E_OK); + + common::RowAppender row_appender(&tsblock); + for (int i = 0; i < 3; ++i) { + ASSERT_TRUE(row_appender.add_row()); + int64_t val = 1000 + i; + row_appender.append(0, reinterpret_cast<const char*>(&val), + sizeof(int64_t)); + } + + arrow::ArrowArray array; + arrow::ArrowSchema schema; + int ret = arrow::TsBlockToArrowStruct(tsblock, &array, &schema); + ASSERT_EQ(ret, common::E_OK); + + EXPECT_STREQ(schema.format, "+s"); + EXPECT_EQ(schema.n_children, 1); + EXPECT_STREQ(schema.children[0]->name, "single_col"); + EXPECT_STREQ(schema.children[0]->format, "l"); + + EXPECT_EQ(array.length, 3); + EXPECT_EQ(array.n_children, 1); + if (array.release != nullptr) { + array.release(&array); + } + if (schema.release != nullptr) { + schema.release(&schema); + } + } + + { + common::TupleDesc tuple_desc; + common::ColumnSchema col1("int_col", common::INT32, common::SNAPPY, + common::RLE); + common::ColumnSchema col2("double_col", common::DOUBLE, common::SNAPPY, + common::RLE); + tuple_desc.push_back(col1); + tuple_desc.push_back(col2); + + const int row_count = 1000; + common::TsBlock tsblock(&tuple_desc, row_count); + ASSERT_EQ(tsblock.init(), common::E_OK); + + common::RowAppender row_appender(&tsblock); + for (int i = 0; i < row_count; ++i) { + ASSERT_TRUE(row_appender.add_row()); + int32_t int_val = i; + row_appender.append(0, reinterpret_cast<const char*>(&int_val), + sizeof(int32_t)); + double double_val = i * 0.5; + row_appender.append(1, reinterpret_cast<const char*>(&double_val), + sizeof(double)); + } + + arrow::ArrowArray array; + arrow::ArrowSchema schema; + int ret = arrow::TsBlockToArrowStruct(tsblock, &array, &schema); + ASSERT_EQ(ret, common::E_OK); + + EXPECT_EQ(array.length, row_count); + EXPECT_EQ(array.n_children, 2); + + const arrow::ArrowArray* int_array = array.children[0]; + const int32_t* int_data = + reinterpret_cast<const int32_t*>(int_array->buffers[1]); + EXPECT_EQ(int_data[0], 0); + EXPECT_EQ(int_data[row_count - 1], row_count - 1); + + const arrow::ArrowArray* double_array = array.children[1]; + const double* double_data = + reinterpret_cast<const double*>(double_array->buffers[1]); + EXPECT_DOUBLE_EQ(double_data[0], 0.0); + EXPECT_DOUBLE_EQ(double_data[row_count - 1], (row_count - 1) * 0.5); + + if (array.release != nullptr) { + array.release(&array); + } + if (schema.release != nullptr) { + schema.release(&schema); + } + } +} diff --git a/cpp/test/reader/table_view/tsfile_reader_table_batch_test.cc b/cpp/test/reader/table_view/tsfile_reader_table_batch_test.cc new file mode 100644 index 000000000..ece8ea7bf --- /dev/null +++ b/cpp/test/reader/table_view/tsfile_reader_table_batch_test.cc @@ -0,0 +1,475 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include <gtest/gtest.h> + +#include <chrono> +#include <random> +#include <vector> + +#include "common/record.h" +#include "common/schema.h" +#include "common/tablet.h" +#include "file/tsfile_io_writer.h" +#include "file/write_file.h" +#include "reader/table_result_set.h" +#include "reader/tsfile_reader.h" +#include "writer/chunk_writer.h" +#include "writer/tsfile_table_writer.h" + +using namespace storage; +using namespace common; + +class TsFileTableReaderBatchTest : public ::testing::Test { + protected: + void SetUp() override { + libtsfile_init(); + file_name_ = std::string("tsfile_reader_table_batch_test_") + + generate_random_string(10) + std::string(".tsfile"); + remove(file_name_.c_str()); + int flags = O_WRONLY | O_CREAT | O_TRUNC; +#ifdef _WIN32 + flags |= O_BINARY; +#endif + mode_t mode = 0666; + write_file_.create(file_name_, flags, mode); + } + void TearDown() override { + remove(file_name_.c_str()); + libtsfile_destroy(); + } + std::string file_name_; + WriteFile write_file_; + + public: + static std::string generate_random_string(int length) { + std::random_device rd; + std::mt19937 gen(rd()); + std::uniform_int_distribution<> dis(0, 61); + + const std::string chars = + "0123456789" + "abcdefghijklmnopqrstuvwxyz" + "ABCDEFGHIJKLMNOPQRSTUVWXYZ"; + + std::string random_string; + + for (int i = 0; i < length; ++i) { + random_string += chars[dis(gen)]; + } + + return random_string; + } + + static TableSchema* gen_table_schema_no_tag() { + // Generate table schema with only FIELD columns (no TAG columns) + std::vector<MeasurementSchema*> measurement_schemas; + std::vector<ColumnCategory> column_categories; + int measurement_schema_num = 5; // 5 field columns + for (int i = 0; i < measurement_schema_num; i++) { + measurement_schemas.emplace_back(new MeasurementSchema( + "s" + std::to_string(i), TSDataType::INT64, TSEncoding::PLAIN, + CompressionType::UNCOMPRESSED)); + column_categories.emplace_back(ColumnCategory::FIELD); + } + return new TableSchema("testTableNoTag", measurement_schemas, + column_categories); + } + + static storage::Tablet gen_tablet_no_tag(TableSchema* table_schema, + int num_rows) { + // Generate tablet without tags (only field columns) + storage::Tablet tablet(table_schema->get_table_name(), + table_schema->get_measurement_names(), + table_schema->get_data_types(), + table_schema->get_column_categories(), num_rows); + + for (int i = 0; i < num_rows; i++) { + tablet.add_timestamp(i, i); + auto column_schemas = table_schema->get_measurement_schemas(); + for (const auto& column_schema : column_schemas) { + if (column_schema->data_type_ == TSDataType::INT64) { + tablet.add_value(i, column_schema->measurement_name_, + static_cast<int64_t>(i)); + } + } + } + return tablet; + } + + static TableSchema* gen_table_schema() { + std::vector<MeasurementSchema*> measurement_schemas; + std::vector<ColumnCategory> column_categories; + int id_schema_num = 2; + int measurement_schema_num = 3; + for (int i = 0; i < id_schema_num; i++) { + measurement_schemas.emplace_back(new MeasurementSchema( + "id" + std::to_string(i), TSDataType::STRING, TSEncoding::PLAIN, + CompressionType::UNCOMPRESSED)); + column_categories.emplace_back(ColumnCategory::TAG); + } + for (int i = 0; i < measurement_schema_num; i++) { + measurement_schemas.emplace_back(new MeasurementSchema( + "s" + std::to_string(i), TSDataType::INT64, TSEncoding::PLAIN, + CompressionType::UNCOMPRESSED)); + column_categories.emplace_back(ColumnCategory::FIELD); + } + return new TableSchema("testTable", measurement_schemas, + column_categories); + } + + static storage::Tablet gen_tablet(TableSchema* table_schema, int offset, + int device_num, + int num_timestamp_per_device = 10) { + storage::Tablet tablet(table_schema->get_table_name(), + table_schema->get_measurement_names(), + table_schema->get_data_types(), + table_schema->get_column_categories(), + device_num * num_timestamp_per_device); + + char* literal = new char[std::strlen("device_id") + 1]; + std::strcpy(literal, "device_id"); + String literal_str(literal, std::strlen("device_id")); + for (int i = 0; i < device_num; i++) { + for (int l = 0; l < num_timestamp_per_device; l++) { + int row_index = i * num_timestamp_per_device + l; + tablet.add_timestamp(row_index, row_index); + auto column_schemas = table_schema->get_measurement_schemas(); + for (const auto& column_schema : column_schemas) { + switch (column_schema->data_type_) { + case TSDataType::INT64: + tablet.add_value(row_index, + column_schema->measurement_name_, + static_cast<int64_t>(i)); + break; + case TSDataType::STRING: + tablet.add_value(row_index, + column_schema->measurement_name_, + literal_str); + break; + default: + break; + } + } + } + } + delete[] literal; + return tablet; + } +}; + +TEST_F(TsFileTableReaderBatchTest, BatchQueryWithSmallBatchSize) { + auto table_schema = gen_table_schema(); + auto tsfile_table_writer_ = + std::make_shared<TsFileTableWriter>(&write_file_, table_schema); + + const int device_num = 2; + const int points_per_device = 50; + auto tablet = gen_tablet(table_schema, 0, device_num, points_per_device); + ASSERT_EQ(tsfile_table_writer_->write_table(tablet), common::E_OK); + ASSERT_EQ(tsfile_table_writer_->flush(), common::E_OK); + ASSERT_EQ(tsfile_table_writer_->close(), common::E_OK); + + storage::TsFileReader reader; + int ret = reader.open(file_name_); + ASSERT_EQ(ret, common::E_OK); + + ResultSet* tmp_result_set = nullptr; + const int batch_size = 20; + ret = reader.query(table_schema->get_table_name(), + table_schema->get_measurement_names(), 0, 1000000000000, + tmp_result_set, batch_size); + ASSERT_EQ(ret, common::E_OK); + ASSERT_NE(tmp_result_set, nullptr); + + auto* table_result_set = dynamic_cast<TableResultSet*>(tmp_result_set); + ASSERT_NE(table_result_set, nullptr); + + int total_rows = 0; + int block_count = 0; + common::TsBlock* block = nullptr; + + char* literal = new char[std::strlen("device_id") + 1]; + std::strcpy(literal, "device_id"); + String expected_string(literal, std::strlen("device_id")); + std::vector<int64_t> int64_sums(3, 0); + std::cout << "begin to start" << std::endl; + while ((ret = table_result_set->get_next_tsblock(block)) == common::E_OK) { + ASSERT_NE(block, nullptr); + block_count++; + uint32_t row_count = block->get_row_count(); + total_rows += row_count; + ASSERT_EQ(row_count, batch_size); + + common::RowIterator row_iterator(block); + while (row_iterator.has_next()) { + uint32_t len; + bool null; + + int int64_col_idx = 0; + for (uint32_t col_idx = 1; + col_idx < row_iterator.get_column_count(); ++col_idx) { + const char* value = row_iterator.read(col_idx, &len, &null); + ASSERT_FALSE(null); + TSDataType data_type = row_iterator.get_data_type(col_idx); + if (data_type == TSDataType::INT64) { + int64_t int_val = *reinterpret_cast<const int64_t*>(value); + int64_sums[int64_col_idx] += int_val; + int64_col_idx++; + std::cout << "to add" << int_val << std::endl; + } else if (data_type == TSDataType::STRING) { + String str_value(value, len); + ASSERT_EQ(str_value.compare(expected_string), 0); + } + } + row_iterator.next(); + } + } + std::cout << "finish with ret" << ret << std::endl; + std::cout << "check finished" << std::endl; + EXPECT_EQ(total_rows, device_num * points_per_device); + EXPECT_GT(block_count, 1); + for (size_t i = 0; i < int64_sums.size(); i++) { + EXPECT_EQ(int64_sums[i], 50); + } + + delete[] literal; + + reader.destroy_query_data_set(table_result_set); + ASSERT_EQ(reader.close(), common::E_OK); + delete table_schema; +} + +TEST_F(TsFileTableReaderBatchTest, BatchQueryWithLargeBatchSize) { + auto table_schema = gen_table_schema(); + auto tsfile_table_writer_ = + std::make_shared<TsFileTableWriter>(&write_file_, table_schema); + + const int device_num = 1; + const int points_per_device = 120; + auto tablet = gen_tablet(table_schema, 0, device_num, points_per_device); + ASSERT_EQ(tsfile_table_writer_->write_table(tablet), common::E_OK); + ASSERT_EQ(tsfile_table_writer_->flush(), common::E_OK); + ASSERT_EQ(tsfile_table_writer_->close(), common::E_OK); + + storage::TsFileReader reader; + int ret = reader.open(file_name_); + ASSERT_EQ(ret, common::E_OK); + + ResultSet* tmp_result_set = nullptr; + const int batch_size = 100; + ret = reader.query(table_schema->get_table_name(), + table_schema->get_measurement_names(), 0, 1000000000000, + tmp_result_set, batch_size); + ASSERT_EQ(ret, common::E_OK); + ASSERT_NE(tmp_result_set, nullptr); + + auto* table_result_set = dynamic_cast<TableResultSet*>(tmp_result_set); + ASSERT_NE(table_result_set, nullptr); + + int total_rows = 0; + int block_count = 0; + common::TsBlock* block = nullptr; + + while (table_result_set->get_next_tsblock(block) == common::E_OK) { + ASSERT_NE(block, nullptr); + block_count++; + uint32_t row_count = block->get_row_count(); + total_rows += row_count; + + ASSERT_EQ(row_count, block_count == 1 ? batch_size : 20); + } + + EXPECT_EQ(total_rows, device_num * points_per_device); + EXPECT_GE(block_count, 2); + + reader.destroy_query_data_set(table_result_set); + ASSERT_EQ(reader.close(), common::E_OK); + delete table_schema; +} + +TEST_F(TsFileTableReaderBatchTest, BatchQueryVerifyDataCorrectness) { + auto table_schema = gen_table_schema(); + auto tsfile_table_writer_ = + std::make_shared<TsFileTableWriter>(&write_file_, table_schema); + + const int device_num = 1; + const int points_per_device = 30; + auto tablet = gen_tablet(table_schema, 0, device_num, points_per_device); + ASSERT_EQ(tsfile_table_writer_->write_table(tablet), common::E_OK); + ASSERT_EQ(tsfile_table_writer_->flush(), common::E_OK); + ASSERT_EQ(tsfile_table_writer_->close(), common::E_OK); + + storage::TsFileReader reader; + int ret = reader.open(file_name_); + ASSERT_EQ(ret, common::E_OK); + + ResultSet* tmp_result_set = nullptr; + const int batch_size = 10; + ret = reader.query(table_schema->get_table_name(), + table_schema->get_measurement_names(), 0, 1000000000000, + tmp_result_set, batch_size); + ASSERT_EQ(ret, common::E_OK); + ASSERT_NE(tmp_result_set, nullptr); + + auto* table_result_set = dynamic_cast<TableResultSet*>(tmp_result_set); + ASSERT_NE(table_result_set, nullptr); + + int expected_timestamp = 0; + common::TsBlock* block = nullptr; + + while (table_result_set->get_next_tsblock(block) == common::E_OK) { + ASSERT_NE(block, nullptr); + + common::RowIterator row_iterator(block); + while (row_iterator.has_next()) { + uint32_t len; + bool null; + int64_t timestamp = *reinterpret_cast<const int64_t*>( + row_iterator.read(0, &len, &null)); + ASSERT_FALSE(null); + EXPECT_EQ(timestamp, expected_timestamp); + + for (uint32_t col_idx = 2; + col_idx < row_iterator.get_column_count(); ++col_idx) { + const char* value = row_iterator.read(col_idx, &len, &null); + if (!null && row_iterator.get_data_type(col_idx) == INT64) { + int64_t int_val = *reinterpret_cast<const int64_t*>(value); + EXPECT_EQ(int_val, 0); + } + } + row_iterator.next(); + expected_timestamp++; + } + } + + EXPECT_EQ(expected_timestamp, device_num * points_per_device); + + reader.destroy_query_data_set(table_result_set); + ASSERT_EQ(reader.close(), common::E_OK); + delete table_schema; +} + +TEST_F(TsFileTableReaderBatchTest, PerformanceComparisonSinglePointVsBatch) { + // Create table schema without tags (only fields) + auto table_schema = gen_table_schema_no_tag(); + auto tsfile_table_writer_ = + std::make_shared<TsFileTableWriter>(&write_file_, table_schema); + + // Write a large amount of data + const int total_rows = 1000000; + auto tablet = gen_tablet_no_tag(table_schema, total_rows); + ASSERT_EQ(tsfile_table_writer_->write_table(tablet), common::E_OK); + ASSERT_EQ(tsfile_table_writer_->flush(), common::E_OK); + ASSERT_EQ(tsfile_table_writer_->close(), common::E_OK); + + // Test 1: Single point query (using next() method) + { + storage::TsFileReader reader; + int ret = reader.open(file_name_); + ASSERT_EQ(ret, common::E_OK); + + ResultSet* tmp_result_set = nullptr; + // Single point query: don't specify batch_size (or use 0) + auto start_time = std::chrono::high_resolution_clock::now(); + + ret = reader.query(table_schema->get_table_name(), + table_schema->get_measurement_names(), 0, + 1000000000000, tmp_result_set); + ASSERT_EQ(ret, common::E_OK); + ASSERT_NE(tmp_result_set, nullptr); + + auto* table_result_set = dynamic_cast<TableResultSet*>(tmp_result_set); + ASSERT_NE(table_result_set, nullptr); + + int total_rows_read = 0; + bool has_next = false; + + // Use next() method for single point query + while (IS_SUCC(table_result_set->next(has_next)) && has_next) { + total_rows_read++; + } + + auto end_time = std::chrono::high_resolution_clock::now(); + auto duration = std::chrono::duration_cast<std::chrono::milliseconds>( + end_time - start_time); + + EXPECT_EQ(total_rows_read, total_rows); + std::cout << "\n=== Single Point Query (using next() method) ===" + << std::endl; + std::cout << "Total rows read: " << total_rows_read << std::endl; + std::cout << "Time taken: " << duration.count() << " ms" << std::endl; + std::cout << "Throughput: " + << (total_rows_read * 5 * 1000.0 / duration.count()) + << " rows/sec" << std::endl; + + reader.destroy_query_data_set(table_result_set); + ASSERT_EQ(reader.close(), common::E_OK); + } + + // // Test 2: Batch query (batch_size = 1000) + // { + // storage::TsFileReader reader; + // int ret = reader.open(file_name_); + // ASSERT_EQ(ret, common::E_OK); + // + // ResultSet* tmp_result_set = nullptr; + // const int batch_size = 10000; // Batch query + // auto start_time = std::chrono::high_resolution_clock::now(); + // + // ret = reader.query(table_schema->get_table_name(), + // table_schema->get_measurement_names(), 0, + // 1000000000000, tmp_result_set, batch_size); + // ASSERT_EQ(ret, common::E_OK); + // ASSERT_NE(tmp_result_set, nullptr); + // + // auto* table_result_set = + // dynamic_cast<TableResultSet*>(tmp_result_set); + // ASSERT_NE(table_result_set, nullptr); + // + // int total_rows_read = 0; + // common::TsBlock* block = nullptr; + // int block_count = 0; + // + // while ((ret = table_result_set->get_next_tsblock(block)) == + // common::E_OK) { + // ASSERT_NE(block, nullptr); + // block_count++; + // total_rows_read += block->get_row_count(); + // } + // + // auto end_time = std::chrono::high_resolution_clock::now(); + // auto duration = + // std::chrono::duration_cast<std::chrono::milliseconds>( + // end_time - start_time); + // + // EXPECT_EQ(total_rows_read, total_rows); + // std::cout << "\n=== Batch Query (batch_size=10000) ===" << std::endl; + // std::cout << "Total rows read: " << total_rows_read << std::endl; + // std::cout << "Block count: " << block_count << std::endl; + // std::cout << "Time taken: " << duration.count() << " ms" << + // std::endl; std::cout << "Throughput: " + // << (total_rows_read * 5 * 1000.0 / duration.count()) + // << " rows/sec" << std::endl; + // + // reader.destroy_query_data_set(table_result_set); + // ASSERT_EQ(reader.close(), common::E_OK); + // } + + delete table_schema; +} diff --git a/cpp/third_party/zlib-1.3.1/treebuild.xml b/cpp/third_party/zlib-1.3.1/treebuild.xml index 930b00be4..8e030572a 100644 --- a/cpp/third_party/zlib-1.3.1/treebuild.xml +++ b/cpp/third_party/zlib-1.3.1/treebuild.xml @@ -1,103 +1,99 @@ -<?xml version="1.0" ?> +<?xml version="1.0" encoding="UTF-8"?> <package name="zlib" version="1.3.1"> <library name="zlib" dlversion="1.3.1" dlname="z"> - <property name="description"> zip compression library </property> - <property name="include-target-dir" value="$(@PACKAGE/install-includedir)" /> - - <!-- fixme: not implemented yet --> - <property name="compiler/c/inline" value="yes" /> - - <include-file name="zlib.h" scope="public" mode="644" /> - <include-file name="zconf.h" scope="public" mode="644" /> - - <source name="adler32.c"> - <depend name="zlib.h" /> - <depend name="zconf.h" /> - </source> - <source name="compress.c"> - <depend name="zlib.h" /> - <depend name="zconf.h" /> - </source> - <source name="crc32.c"> - <depend name="zlib.h" /> - <depend name="zconf.h" /> - <depend name="crc32.h" /> - </source> - <source name="gzclose.c"> - <depend name="zlib.h" /> - <depend name="zconf.h" /> - <depend name="gzguts.h" /> - </source> - <source name="gzlib.c"> - <depend name="zlib.h" /> - <depend name="zconf.h" /> - <depend name="gzguts.h" /> - </source> - <source name="gzread.c"> - <depend name="zlib.h" /> - <depend name="zconf.h" /> - <depend name="gzguts.h" /> - </source> - <source name="gzwrite.c"> - <depend name="zlib.h" /> - <depend name="zconf.h" /> - <depend name="gzguts.h" /> - </source> - <source name="uncompr.c"> - <depend name="zlib.h" /> - <depend name="zconf.h" /> - </source> - <source name="deflate.c"> - <depend name="zlib.h" /> - <depend name="zconf.h" /> - <depend name="zutil.h" /> - <depend name="deflate.h" /> - </source> - <source name="trees.c"> - <depend name="zlib.h" /> - <depend name="zconf.h" /> - <depend name="zutil.h" /> - <depend name="deflate.h" /> - <depend name="trees.h" /> - </source> - <source name="zutil.c"> - <depend name="zlib.h" /> - <depend name="zconf.h" /> - <depend name="zutil.h" /> - </source> - <source name="inflate.c"> - <depend name="zlib.h" /> - <depend name="zconf.h" /> - <depend name="zutil.h" /> - <depend name="inftrees.h" /> - <depend name="inflate.h" /> - <depend name="inffast.h" /> - </source> - <source name="infback.c"> - <depend name="zlib.h" /> - <depend name="zconf.h" /> - <depend name="zutil.h" /> - <depend name="inftrees.h" /> - <depend name="inflate.h" /> - <depend name="inffast.h" /> - </source> - <source name="inftrees.c"> - <depend name="zlib.h" /> - <depend name="zconf.h" /> - <depend name="zutil.h" /> - <depend name="inftrees.h" /> - </source> - <source name="inffast.c"> - <depend name="zlib.h" /> - <depend name="zconf.h" /> - <depend name="zutil.h" /> - <depend name="inftrees.h" /> - <depend name="inflate.h" /> - <depend name="inffast.h" /> - </source> + <property name="description">zip compression library</property> + <property name="include-target-dir" value="$(@PACKAGE/install-includedir)"/> + <!-- fixme: not implemented yet --> + <property name="compiler/c/inline" value="yes"/> + <include-file name="zlib.h" scope="public" mode="644"/> + <include-file name="zconf.h" scope="public" mode="644"/> + <source name="adler32.c"> + <depend name="zlib.h"/> + <depend name="zconf.h"/> + </source> + <source name="compress.c"> + <depend name="zlib.h"/> + <depend name="zconf.h"/> + </source> + <source name="crc32.c"> + <depend name="zlib.h"/> + <depend name="zconf.h"/> + <depend name="crc32.h"/> + </source> + <source name="gzclose.c"> + <depend name="zlib.h"/> + <depend name="zconf.h"/> + <depend name="gzguts.h"/> + </source> + <source name="gzlib.c"> + <depend name="zlib.h"/> + <depend name="zconf.h"/> + <depend name="gzguts.h"/> + </source> + <source name="gzread.c"> + <depend name="zlib.h"/> + <depend name="zconf.h"/> + <depend name="gzguts.h"/> + </source> + <source name="gzwrite.c"> + <depend name="zlib.h"/> + <depend name="zconf.h"/> + <depend name="gzguts.h"/> + </source> + <source name="uncompr.c"> + <depend name="zlib.h"/> + <depend name="zconf.h"/> + </source> + <source name="deflate.c"> + <depend name="zlib.h"/> + <depend name="zconf.h"/> + <depend name="zutil.h"/> + <depend name="deflate.h"/> + </source> + <source name="trees.c"> + <depend name="zlib.h"/> + <depend name="zconf.h"/> + <depend name="zutil.h"/> + <depend name="deflate.h"/> + <depend name="trees.h"/> + </source> + <source name="zutil.c"> + <depend name="zlib.h"/> + <depend name="zconf.h"/> + <depend name="zutil.h"/> + </source> + <source name="inflate.c"> + <depend name="zlib.h"/> + <depend name="zconf.h"/> + <depend name="zutil.h"/> + <depend name="inftrees.h"/> + <depend name="inflate.h"/> + <depend name="inffast.h"/> + </source> + <source name="infback.c"> + <depend name="zlib.h"/> + <depend name="zconf.h"/> + <depend name="zutil.h"/> + <depend name="inftrees.h"/> + <depend name="inflate.h"/> + <depend name="inffast.h"/> + </source> + <source name="inftrees.c"> + <depend name="zlib.h"/> + <depend name="zconf.h"/> + <depend name="zutil.h"/> + <depend name="inftrees.h"/> + </source> + <source name="inffast.c"> + <depend name="zlib.h"/> + <depend name="zconf.h"/> + <depend name="zutil.h"/> + <depend name="inftrees.h"/> + <depend name="inflate.h"/> + <depend name="inffast.h"/> + </source> </library> </package> - <!-- CFLAGS=-O #CFLAGS=-O -DMAX_WBITS=14 -DMAX_MEM_LEVEL=7 diff --git a/cpp/third_party/zlib-1.3.1/zlib-1.3.1/treebuild.xml b/cpp/third_party/zlib-1.3.1/zlib-1.3.1/treebuild.xml index 930b00be4..8e030572a 100644 --- a/cpp/third_party/zlib-1.3.1/zlib-1.3.1/treebuild.xml +++ b/cpp/third_party/zlib-1.3.1/zlib-1.3.1/treebuild.xml @@ -1,103 +1,99 @@ -<?xml version="1.0" ?> +<?xml version="1.0" encoding="UTF-8"?> <package name="zlib" version="1.3.1"> <library name="zlib" dlversion="1.3.1" dlname="z"> - <property name="description"> zip compression library </property> - <property name="include-target-dir" value="$(@PACKAGE/install-includedir)" /> - - <!-- fixme: not implemented yet --> - <property name="compiler/c/inline" value="yes" /> - - <include-file name="zlib.h" scope="public" mode="644" /> - <include-file name="zconf.h" scope="public" mode="644" /> - - <source name="adler32.c"> - <depend name="zlib.h" /> - <depend name="zconf.h" /> - </source> - <source name="compress.c"> - <depend name="zlib.h" /> - <depend name="zconf.h" /> - </source> - <source name="crc32.c"> - <depend name="zlib.h" /> - <depend name="zconf.h" /> - <depend name="crc32.h" /> - </source> - <source name="gzclose.c"> - <depend name="zlib.h" /> - <depend name="zconf.h" /> - <depend name="gzguts.h" /> - </source> - <source name="gzlib.c"> - <depend name="zlib.h" /> - <depend name="zconf.h" /> - <depend name="gzguts.h" /> - </source> - <source name="gzread.c"> - <depend name="zlib.h" /> - <depend name="zconf.h" /> - <depend name="gzguts.h" /> - </source> - <source name="gzwrite.c"> - <depend name="zlib.h" /> - <depend name="zconf.h" /> - <depend name="gzguts.h" /> - </source> - <source name="uncompr.c"> - <depend name="zlib.h" /> - <depend name="zconf.h" /> - </source> - <source name="deflate.c"> - <depend name="zlib.h" /> - <depend name="zconf.h" /> - <depend name="zutil.h" /> - <depend name="deflate.h" /> - </source> - <source name="trees.c"> - <depend name="zlib.h" /> - <depend name="zconf.h" /> - <depend name="zutil.h" /> - <depend name="deflate.h" /> - <depend name="trees.h" /> - </source> - <source name="zutil.c"> - <depend name="zlib.h" /> - <depend name="zconf.h" /> - <depend name="zutil.h" /> - </source> - <source name="inflate.c"> - <depend name="zlib.h" /> - <depend name="zconf.h" /> - <depend name="zutil.h" /> - <depend name="inftrees.h" /> - <depend name="inflate.h" /> - <depend name="inffast.h" /> - </source> - <source name="infback.c"> - <depend name="zlib.h" /> - <depend name="zconf.h" /> - <depend name="zutil.h" /> - <depend name="inftrees.h" /> - <depend name="inflate.h" /> - <depend name="inffast.h" /> - </source> - <source name="inftrees.c"> - <depend name="zlib.h" /> - <depend name="zconf.h" /> - <depend name="zutil.h" /> - <depend name="inftrees.h" /> - </source> - <source name="inffast.c"> - <depend name="zlib.h" /> - <depend name="zconf.h" /> - <depend name="zutil.h" /> - <depend name="inftrees.h" /> - <depend name="inflate.h" /> - <depend name="inffast.h" /> - </source> + <property name="description">zip compression library</property> + <property name="include-target-dir" value="$(@PACKAGE/install-includedir)"/> + <!-- fixme: not implemented yet --> + <property name="compiler/c/inline" value="yes"/> + <include-file name="zlib.h" scope="public" mode="644"/> + <include-file name="zconf.h" scope="public" mode="644"/> + <source name="adler32.c"> + <depend name="zlib.h"/> + <depend name="zconf.h"/> + </source> + <source name="compress.c"> + <depend name="zlib.h"/> + <depend name="zconf.h"/> + </source> + <source name="crc32.c"> + <depend name="zlib.h"/> + <depend name="zconf.h"/> + <depend name="crc32.h"/> + </source> + <source name="gzclose.c"> + <depend name="zlib.h"/> + <depend name="zconf.h"/> + <depend name="gzguts.h"/> + </source> + <source name="gzlib.c"> + <depend name="zlib.h"/> + <depend name="zconf.h"/> + <depend name="gzguts.h"/> + </source> + <source name="gzread.c"> + <depend name="zlib.h"/> + <depend name="zconf.h"/> + <depend name="gzguts.h"/> + </source> + <source name="gzwrite.c"> + <depend name="zlib.h"/> + <depend name="zconf.h"/> + <depend name="gzguts.h"/> + </source> + <source name="uncompr.c"> + <depend name="zlib.h"/> + <depend name="zconf.h"/> + </source> + <source name="deflate.c"> + <depend name="zlib.h"/> + <depend name="zconf.h"/> + <depend name="zutil.h"/> + <depend name="deflate.h"/> + </source> + <source name="trees.c"> + <depend name="zlib.h"/> + <depend name="zconf.h"/> + <depend name="zutil.h"/> + <depend name="deflate.h"/> + <depend name="trees.h"/> + </source> + <source name="zutil.c"> + <depend name="zlib.h"/> + <depend name="zconf.h"/> + <depend name="zutil.h"/> + </source> + <source name="inflate.c"> + <depend name="zlib.h"/> + <depend name="zconf.h"/> + <depend name="zutil.h"/> + <depend name="inftrees.h"/> + <depend name="inflate.h"/> + <depend name="inffast.h"/> + </source> + <source name="infback.c"> + <depend name="zlib.h"/> + <depend name="zconf.h"/> + <depend name="zutil.h"/> + <depend name="inftrees.h"/> + <depend name="inflate.h"/> + <depend name="inffast.h"/> + </source> + <source name="inftrees.c"> + <depend name="zlib.h"/> + <depend name="zconf.h"/> + <depend name="zutil.h"/> + <depend name="inftrees.h"/> + </source> + <source name="inffast.c"> + <depend name="zlib.h"/> + <depend name="zconf.h"/> + <depend name="zutil.h"/> + <depend name="inftrees.h"/> + <depend name="inflate.h"/> + <depend name="inffast.h"/> + </source> </library> </package> - <!-- CFLAGS=-O #CFLAGS=-O -DMAX_WBITS=14 -DMAX_MEM_LEVEL=7 diff --git a/python/Untitled b/python/Untitled new file mode 100644 index 000000000..4197b94e5 --- /dev/null +++ b/python/Untitled @@ -0,0 +1 @@ +**/node_modules/** \ No newline at end of file diff --git a/python/lower_case_name.tsfile b/python/lower_case_name.tsfile new file mode 100644 index 000000000..732a9f94e Binary files /dev/null and b/python/lower_case_name.tsfile differ diff --git a/python/requirements.txt b/python/requirements.txt index 5b1c19aa2..fcb05c3ae 100644 --- a/python/requirements.txt +++ b/python/requirements.txt @@ -21,5 +21,7 @@ cython==3.0.10 numpy==1.26.4 pandas==2.2.2 setuptools==78.1.1 +<<<<<<< HEAD wheel==0.46.2 +pyarrow diff --git a/python/setup.py b/python/setup.py index 9f2a1a37a..40fe4e51a 100644 --- a/python/setup.py +++ b/python/setup.py @@ -89,6 +89,9 @@ else: tsfile_include_dir = os.path.join(project_dir, "tsfile", "include") +extra_compile_args = ["-O0", "-g3", "-fno-omit-frame-pointer"] +extra_link_args = ["-g"] + ext_modules_tsfile = [ # utils: from python to c or c to python. Extension( @@ -98,8 +101,10 @@ ext_modules_tsfile = [ library_dirs=[tsfile_shared_dir], include_dirs=[tsfile_include_dir, np.get_include()], runtime_library_dirs=[tsfile_shared_dir] if system != "Windows" else None, + define_macros=[("CYTHON_TRACE_NOGIL", "1")], extra_compile_args=( - ["-std=c++11"] if system != "Windows" else ["-std=c++11", "-DMS_WIN64"] + ["-std=c++11", "-O0", "-g3", "-fno-omit-frame-pointer"] if system != "Windows" else ["-std=c++11", + "-DMS_WIN64"] ), language="c++", ), @@ -112,8 +117,10 @@ ext_modules_tsfile = [ depends=[os.path.join("tsfile", "tsfile_py_cpp.pxd")], include_dirs=[tsfile_include_dir, np.get_include()], runtime_library_dirs=[tsfile_shared_dir] if system != "Windows" else None, + define_macros=[("CYTHON_TRACE_NOGIL", "1")], extra_compile_args=( - ["-std=c++11"] if system != "Windows" else ["-std=c++11", "-DMS_WIN64"] + ["-std=c++11", "-O0", "-g3", "-fno-omit-frame-pointer"] if system != "Windows" else ["-std=c++11", + "-DMS_WIN64"] ), language="c++", ), @@ -125,9 +132,10 @@ ext_modules_tsfile = [ library_dirs=[tsfile_shared_dir], depends=[os.path.join("tsfile", "tsfile_py_cpp.pxd")], include_dirs=[tsfile_include_dir, np.get_include()], + define_macros=[("CYTHON_TRACE_NOGIL", "1")], runtime_library_dirs=[tsfile_shared_dir] if system != "Windows" else None, extra_compile_args=( - ["-std=c++11"] if system != "Windows" else ["-std=c++11", "-DMS_WIN64"] + ["-std=c++11", "-O0", "-g3", "-fno-omit-frame-pointer"] if system != "Windows" else ["-std=c++11", "-DMS_WIN64"] ), language="c++", ) @@ -165,5 +173,7 @@ setup( "*.pxd" ] }, + annotate=True, + gdb_debug=True, include_package_data=True, ) diff --git a/python/test1.tsfile b/python/test1.tsfile new file mode 100644 index 000000000..85264f12c Binary files /dev/null and b/python/test1.tsfile differ diff --git a/python/tests/bench_batch_arrow_vs_dataframe.py b/python/tests/bench_batch_arrow_vs_dataframe.py new file mode 100644 index 000000000..082e0c7a8 --- /dev/null +++ b/python/tests/bench_batch_arrow_vs_dataframe.py @@ -0,0 +1,264 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +""" +Benchmark: query_table_batch + read_arrow_batch vs query_table + read_data_frame. + +Compares throughput and elapsed time when reading the same table data via + - Arrow path: query_table_batch(batch_size=N) then read_arrow_batch() in a loop + - DataFrame path: query_table() then result.next() + read_data_frame(N) in a loop + +Run from project root or python/tests, e.g.: + python -m pytest tests/bench_batch_arrow_vs_dataframe.py -v -s + python tests/bench_batch_arrow_vs_dataframe.py # if run as script +""" + +import os +import sys +import time +from os import remove + +import pandas as pd +import pytest + +from tsfile import ( + ColumnSchema, + ColumnCategory, + TSDataType, + TableSchema, + Tablet, + TsFileReader, + TsFileTableWriter, +) + +# Default benchmark size +DEFAULT_ROW_COUNT = 50_000 +DEFAULT_BATCH_SIZE = 4096 +DEFAULT_WARMUP_ROUNDS = 1 +DEFAULT_TIMED_ROUNDS = 3 + +BENCH_FILE = "bench_arrow_vs_dataframe.tsfile" +TABLE_NAME = "bench_table" +COLUMNS = ["device", "value1", "value2"] + + +def _ensure_bench_tsfile(file_path: str, row_count: int) -> None: + """Create tsfile with table data if not present. Uses DataFrame for fast data generation.""" + if os.path.exists(file_path): + remove(file_path) + # Build data with pandas/numpy (vectorized, much faster than row-by-row Tablet) + import numpy as np + df = pd.DataFrame({ + "time": np.arange(row_count, dtype=np.int64), + "device": pd.Series([f"device_{i}" for i in range(row_count)]), + "value1": np.arange(0, row_count * 10, 10, dtype=np.int64), + "value2": np.arange(row_count, dtype=np.float64) * 1.5, + }) + + table = TableSchema( + TABLE_NAME, + [ + ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG), + ColumnSchema("value1", TSDataType.INT64, ColumnCategory.FIELD), + ColumnSchema("value2", TSDataType.DOUBLE, ColumnCategory.FIELD), + ], + ) + chunk = min(row_count, 10_000) + tablet = Tablet( + COLUMNS, + [TSDataType.STRING, TSDataType.INT64, TSDataType.DOUBLE], + chunk, + ) + with TsFileTableWriter(file_path, table) as writer: + for start in range(0, row_count, chunk): + end = min(start + chunk, row_count) + n = end - start + chunk_df = df.iloc[start:end] + # Bulk-fill tablet from DataFrame (column order: device, value1, value2) + tablet.timestamp_list[:n] = chunk_df["time"].tolist() + tablet.data_list[0][:n] = chunk_df["device"].tolist() + tablet.data_list[1][:n] = chunk_df["value1"].tolist() + tablet.data_list[2][:n] = chunk_df["value2"].tolist() + # Unused rows must be None so C side skips them + for i in range(n, chunk): + tablet.timestamp_list[i] = None + for col_idx in range(len(COLUMNS)): + tablet.data_list[col_idx][i] = None + writer.write_table(tablet) + + +def _read_via_arrow(file_path: str, batch_size: int, end_time: int) -> int: + """Read all rows using query_table_batch + read_arrow_batch. Returns total rows.""" + import pyarrow as pa # noqa: F401 + + reader = TsFileReader(file_path) + result_set = reader.query_table_batch( + table_name=TABLE_NAME, + column_names=COLUMNS, + start_time=0, + end_time=end_time, + batch_size=batch_size, + ) + total_rows = 0 + try: + while True: + batch = result_set.read_arrow_batch() + if batch is None: + break + total_rows += len(batch) + finally: + result_set.close() + reader.close() + return total_rows + + +def _read_via_dataframe(file_path: str, batch_size: int, end_time: int) -> int: + """Read all rows using query_table + next + read_data_frame. Returns total rows.""" + reader = TsFileReader(file_path) + result_set = reader.query_table( + TABLE_NAME, + COLUMNS, + start_time=0, + end_time=end_time, + ) + total_rows = 0 + try: + while result_set.next(): + df = result_set.read_data_frame(max_row_num=batch_size) + if df is None or len(df) == 0: + break + total_rows += len(df) + finally: + result_set.close() + reader.close() + return total_rows + + +def _run_timed(name: str, func, *args, rounds: int = DEFAULT_TIMED_ROUNDS): + times = [] + for _ in range(rounds): + start = time.perf_counter() + n = func(*args) + elapsed = time.perf_counter() - start + times.append(elapsed) + avg = sum(times) / len(times) + total_rows = n + rows_per_sec = total_rows / avg if avg > 0 else 0 + print(f" {name}: {avg:.3f}s avg ({min(times):.3f}s min) rows={total_rows} {rows_per_sec:.0f} rows/s") + return avg, total_rows + + +def run_benchmark( + row_count: int = DEFAULT_ROW_COUNT, + batch_size: int = DEFAULT_BATCH_SIZE, + warmup_rounds: int = DEFAULT_WARMUP_ROUNDS, + timed_rounds: int = DEFAULT_TIMED_ROUNDS, + file_path: str = BENCH_FILE, +): + try: + import pyarrow as pa # noqa: F401 + pa_available = True + except ImportError: + pa_available = False + + _ensure_bench_tsfile(file_path, row_count) + end_time = row_count + 1 + + print(f"Benchmark: {row_count} rows, batch_size={batch_size}") + print(f" warmup_rounds={warmup_rounds}, timed_rounds={timed_rounds}") + if not pa_available: + print(" pyarrow not installed; Arrow path skipped.") + + # Warmup + for _ in range(warmup_rounds): + _read_via_dataframe(file_path, batch_size, end_time) + if pa_available: + for _ in range(warmup_rounds): + _read_via_arrow(file_path, batch_size, end_time) + + # Timed runs + df_avg, df_rows = _run_timed( + "query_table + read_data_frame", + _read_via_dataframe, + file_path, + batch_size, + end_time, + rounds=timed_rounds, + ) + + if pa_available: + arrow_avg, arrow_rows = _run_timed( + "query_table_batch + read_arrow_batch", + _read_via_arrow, + file_path, + batch_size, + end_time, + rounds=timed_rounds, + ) + print() + if df_avg > 0: + speedup = arrow_avg / df_avg + print(f" Arrow vs DataFrame time ratio: {speedup:.2f}x ({'Arrow faster' if speedup < 1 else 'DataFrame faster'})") + assert df_rows == row_count, f"DataFrame path row count {df_rows} != {row_count}" + assert arrow_rows == row_count, f"Arrow path row count {arrow_rows} != {row_count}" + else: + assert df_rows == row_count, f"DataFrame path row count {df_rows} != {row_count}" + + print() + return (df_avg, arrow_avg) if pa_available else (df_avg, None) + + +def test_bench_arrow_vs_dataframe_default(): + """Run benchmark with default size (quick sanity check).""" + run_benchmark( + row_count=5_000, + batch_size=1024, + warmup_rounds=0, + timed_rounds=2, + ) + + +def test_bench_arrow_vs_dataframe_medium(): + """Run benchmark with medium size.""" + run_benchmark( + row_count=DEFAULT_ROW_COUNT, + batch_size=DEFAULT_BATCH_SIZE, + warmup_rounds=DEFAULT_WARMUP_ROUNDS, + timed_rounds=DEFAULT_TIMED_ROUNDS, + ) + + +def test_bench_arrow_vs_dataframe_large(): + run_benchmark( + row_count=200_000, + batch_size=8192, + warmup_rounds=1, + timed_rounds=3, + ) + + +if __name__ == "__main__": + row_count = DEFAULT_ROW_COUNT + batch_size = DEFAULT_BATCH_SIZE + if len(sys.argv) > 1: + row_count = int(sys.argv[1]) + if len(sys.argv) > 2: + batch_size = int(sys.argv[2]) + run_benchmark(row_count=row_count, batch_size=batch_size) + # Clean up bench file when run as script (optional) + if os.path.exists(BENCH_FILE): + os.remove(BENCH_FILE) diff --git a/python/tests/test_batch_arrow.py b/python/tests/test_batch_arrow.py new file mode 100644 index 000000000..75bd6c4c1 --- /dev/null +++ b/python/tests/test_batch_arrow.py @@ -0,0 +1,444 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import os +from datetime import date + +import numpy as np +import pandas as pd +import pytest + +from tsfile import ColumnSchema, TableSchema, TSDataType, ColumnCategory +from tsfile import Tablet +from tsfile import TsFileTableWriter, TsFileReader + + +def test_batch_read_arrow_basic(): + file_path = "test_batch_arrow_basic.tsfile" + table = TableSchema( + "test_table", + [ + ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG), + ColumnSchema("value1", TSDataType.INT64, ColumnCategory.FIELD), + ColumnSchema("value2", TSDataType.DOUBLE, ColumnCategory.FIELD), + ], + ) + + try: + if os.path.exists(file_path): + os.remove(file_path) + print("t1") + + with TsFileTableWriter(file_path, table) as writer: + tablet = Tablet( + ["device", "value1", "value2"], + [TSDataType.STRING, TSDataType.INT64, TSDataType.DOUBLE], + 1000, + ) + for i in range(1000): + tablet.add_timestamp(i, i) + tablet.add_value_by_name("device", i, f"device_{i}") + tablet.add_value_by_name("value1", i, i * 10) + tablet.add_value_by_name("value2", i, i * 1.5) + writer.write_table(tablet) + try: + import pyarrow as pa + except ImportError: + pytest.skip("pyarrow is not installed") + + reader = TsFileReader(file_path) + result_set = reader.query_table_batch( + table_name="test_table", + column_names=["device", "value1", "value2"], + start_time=0, + end_time=1000, + batch_size=256, + ) + + total_rows = 0 + batch_count = 0 + while True: + table = result_set.read_arrow_batch() + if table is None: + break + + batch_count += 1 + assert isinstance(table, pa.Table) + assert len(table) > 0 + total_rows += len(table) + + column_names = table.column_names + assert "time" in column_names + assert "device" in column_names + assert "value1" in column_names + assert "value2" in column_names + + assert total_rows == 1000 + assert batch_count > 0 + + result_set.close() + reader.close() + + finally: + if os.path.exists(file_path): + os.remove(file_path) + + +def test_batch_read_arrow_compare_with_dataframe(): + file_path = "test_batch_arrow_compare.tsfile" + table = TableSchema( + "test_table", + [ + ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG), + ColumnSchema("value1", TSDataType.INT32, ColumnCategory.FIELD), + ColumnSchema("value2", TSDataType.FLOAT, ColumnCategory.FIELD), + ColumnSchema("value3", TSDataType.BOOLEAN, ColumnCategory.FIELD), + ], + ) + + try: + if os.path.exists(file_path): + os.remove(file_path) + + with TsFileTableWriter(file_path, table) as writer: + tablet = Tablet( + ["device", "value1", "value2", "value3"], + [TSDataType.STRING, TSDataType.INT32, TSDataType.FLOAT, TSDataType.BOOLEAN], + 500, + ) + for i in range(500): + tablet.add_timestamp(i, i) + tablet.add_value_by_name("device", i, f"device_{i}") + tablet.add_value_by_name("value1", i, i * 2) + tablet.add_value_by_name("value2", i, i * 1.1) + tablet.add_value_by_name("value3", i, i % 2 == 0) + writer.write_table(tablet) + + try: + import pyarrow as pa + except ImportError: + pytest.skip("pyarrow is not installed") + + reader1 = TsFileReader(file_path) + result_set1 = reader1.query_table_batch( + table_name="test_table", + column_names=["device", "value1", "value2", "value3"], + start_time=0, + end_time=500, + batch_size=100, + ) + + arrow_tables = [] + while True: + table = result_set1.read_arrow_batch() + if table is None: + break + arrow_tables.append(table) + + if arrow_tables: + combined_arrow_table = pa.concat_tables(arrow_tables) + df_arrow = combined_arrow_table.to_pandas() + else: + df_arrow = pd.DataFrame() + + result_set1.close() + reader1.close() + reader2 = TsFileReader(file_path) + result_set2 = reader2.query_table( + table_name="test_table", + column_names=["device", "value1", "value2", "value3"], + start_time=0, + end_time=500, + ) + + df_traditional = result_set2.read_data_frame(max_row_num=1000) + result_set2.close() + reader2.close() + + assert len(df_arrow) == len(df_traditional) + assert len(df_arrow) == 500 + + for col in ["time", "device", "value1", "value2", "value3"]: + assert col in df_arrow.columns + assert col in df_traditional.columns + + df_arrow_sorted = df_arrow.sort_values("time").reset_index(drop=True) + df_traditional_sorted = df_traditional.sort_values("time").reset_index(drop=True) + + for i in range(len(df_arrow_sorted)): + assert df_arrow_sorted.iloc[i]["time"] == df_traditional_sorted.iloc[i]["time"] + assert df_arrow_sorted.iloc[i]["device"] == df_traditional_sorted.iloc[i]["device"] + assert df_arrow_sorted.iloc[i]["value1"] == df_traditional_sorted.iloc[i]["value1"] + assert abs(df_arrow_sorted.iloc[i]["value2"] - df_traditional_sorted.iloc[i]["value2"]) < 1e-5 + assert df_arrow_sorted.iloc[i]["value3"] == df_traditional_sorted.iloc[i]["value3"] + + finally: + if os.path.exists(file_path): + os.remove(file_path) + + +def test_batch_read_arrow_empty_result(): + file_path = "test_batch_arrow_empty.tsfile" + table = TableSchema( + "test_table", + [ + ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG), + ColumnSchema("value", TSDataType.INT64, ColumnCategory.FIELD), + ], + ) + + try: + if os.path.exists(file_path): + os.remove(file_path) + + with TsFileTableWriter(file_path, table) as writer: + tablet = Tablet( + ["device", "value"], + [TSDataType.STRING, TSDataType.INT64], + 10, + ) + for i in range(10): + tablet.add_timestamp(i, i) + tablet.add_value_by_name("device", i, f"device_{i}") + tablet.add_value_by_name("value", i, i) + writer.write_table(tablet) + + try: + import pyarrow as pa + except ImportError: + pytest.skip("pyarrow is not installed") + + reader = TsFileReader(file_path) + result_set = reader.query_table_batch( + table_name="test_table", + column_names=["device", "value"], + start_time=1000, + end_time=2000, + batch_size=100, + ) + + table = result_set.read_arrow_batch() + assert table is None + + result_set.close() + reader.close() + + finally: + if os.path.exists(file_path): + os.remove(file_path) + + +def test_batch_read_arrow_time_range(): + + file_path = "test_batch_arrow_time_range.tsfile" + table = TableSchema( + "test_table", + [ + ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG), + ColumnSchema("value", TSDataType.INT64, ColumnCategory.FIELD), + ], + ) + + try: + if os.path.exists(file_path): + os.remove(file_path) + + with TsFileTableWriter(file_path, table) as writer: + tablet = Tablet( + ["device", "value"], + [TSDataType.STRING, TSDataType.INT64], + 1000, + ) + for i in range(1000): + tablet.add_timestamp(i, i) + tablet.add_value_by_name("device", i, f"device_{i}") + tablet.add_value_by_name("value", i, i) + writer.write_table(tablet) + + try: + import pyarrow as pa + except ImportError: + pytest.skip("pyarrow is not installed") + + reader = TsFileReader(file_path) + result_set = reader.query_table_batch( + table_name="test_table", + column_names=["device", "value"], + start_time=100, + end_time=199, + batch_size=50, + ) + + total_rows = 0 + while True: + table = result_set.read_arrow_batch() + if table is None: + break + total_rows += len(table) + df = table.to_pandas() + assert df["time"].min() >= 100 + assert df["time"].max() <= 199 + + assert total_rows == 100 + + result_set.close() + reader.close() + + finally: + if os.path.exists(file_path): + os.remove(file_path) + + +def test_batch_read_arrow_all_datatypes(): + file_path = "test_batch_arrow_all_datatypes.tsfile" + table = TableSchema( + "test_table", + [ + ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG), + ColumnSchema("bool_val", TSDataType.BOOLEAN, ColumnCategory.FIELD), + ColumnSchema("int32_val", TSDataType.INT32, ColumnCategory.FIELD), + ColumnSchema("int64_val", TSDataType.INT64, ColumnCategory.FIELD), + ColumnSchema("float_val", TSDataType.FLOAT, ColumnCategory.FIELD), + ColumnSchema("double_val", TSDataType.DOUBLE, ColumnCategory.FIELD), + ColumnSchema("string_val", TSDataType.STRING, ColumnCategory.FIELD), + ColumnSchema("date_val", TSDataType.DATE, ColumnCategory.FIELD), + ], + ) + + try: + if os.path.exists(file_path): + os.remove(file_path) + + with TsFileTableWriter(file_path, table) as writer: + tablet = Tablet( + ["device", "bool_val", "int32_val", "int64_val", "float_val", "double_val", "string_val", "date_val"], + [ + TSDataType.STRING, + TSDataType.BOOLEAN, + TSDataType.INT32, + TSDataType.INT64, + TSDataType.FLOAT, + TSDataType.DOUBLE, + TSDataType.STRING, + TSDataType.DATE, + ], + 200, + ) + for i in range(200): + tablet.add_timestamp(i, i) + tablet.add_value_by_name("device", i, f"device_{i}") + tablet.add_value_by_name("bool_val", i, i % 2 == 0) + tablet.add_value_by_name("int32_val", i, i * 2) + tablet.add_value_by_name("int64_val", i, i * 3) + tablet.add_value_by_name("float_val", i, i * 1.1) + tablet.add_value_by_name("double_val", i, i * 2.2) + tablet.add_value_by_name("string_val", i, f"string_{i}") + tablet.add_value_by_name("date_val", i, date(2025, 1, (i % 28) + 1)) + writer.write_table(tablet) + + try: + import pyarrow as pa + except ImportError: + pytest.skip("pyarrow is not installed") + + reader = TsFileReader(file_path) + result_set = reader.query_table_batch( + table_name="test_table", + column_names=["device", "bool_val", "int32_val", "int64_val", "float_val", "double_val", "string_val", "date_val"], + start_time=0, + end_time=200, + batch_size=50, + ) + + total_rows = 0 + while True: + table = result_set.read_arrow_batch() + if table is None: + break + + total_rows += len(table) + df = table.to_pandas() + + assert "time" in df.columns + assert "device" in df.columns + assert "bool_val" in df.columns + assert "int32_val" in df.columns + assert "int64_val" in df.columns + assert "float_val" in df.columns + assert "double_val" in df.columns + assert "string_val" in df.columns + assert "date_val" in df.columns + + assert total_rows == 200 + + result_set.close() + reader.close() + + finally: + if os.path.exists(file_path): + os.remove(file_path) + + +def test_batch_read_arrow_no_pyarrow(): + file_path = "test_batch_arrow_no_pyarrow.tsfile" + table = TableSchema( + "test_table", + [ + ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG), + ColumnSchema("value", TSDataType.INT64, ColumnCategory.FIELD), + ], + ) + + try: + if os.path.exists(file_path): + os.remove(file_path) + + with TsFileTableWriter(file_path, table) as writer: + tablet = Tablet( + ["device", "value"], + [TSDataType.STRING, TSDataType.INT64], + 10, + ) + for i in range(10): + tablet.add_timestamp(i, i) + tablet.add_value_by_name("device", i, f"device_{i}") + tablet.add_value_by_name("value", i, i) + writer.write_table(tablet) + + reader = TsFileReader(file_path) + result_set = reader.query_table_batch( + table_name="test_table", + column_names=["device", "value"], + start_time=0, + end_time=10, + batch_size=5, + ) + result_set.close() + reader.close() + + finally: + if os.path.exists(file_path): + os.remove(file_path) + + +if __name__ == "__main__": + os.chdir(os.path.dirname(os.path.abspath(__file__))) + pytest.main([ + "test_batch_arrow.py", + "-s", "-v" + ]) diff --git a/python/tsfile/tsfile_cpp.pxd b/python/tsfile/tsfile_cpp.pxd index 9c65fb26f..e82092ed6 100644 --- a/python/tsfile/tsfile_cpp.pxd +++ b/python/tsfile/tsfile_cpp.pxd @@ -184,6 +184,12 @@ cdef extern from "./tsfile_cwrapper.h": int64_t start_time, int64_t end_time, ErrorCode* err_code); + ResultSet tsfile_query_table_batch(TsFileReader reader, + const char * table_name, + char** columns, uint32_t column_num, + int64_t start_time, int64_t end_time, + int batch_size, ErrorCode* err_code); + ResultSet _tsfile_reader_query_device(TsFileReader reader, const char *device_name, char ** sensor_name, uint32_t sensor_num, @@ -213,6 +219,35 @@ cdef extern from "./tsfile_cwrapper.h": ResultSetMetaData tsfile_result_set_get_metadata(ResultSet result_set); void free_result_set_meta_data(ResultSetMetaData result_set_meta_data); + # Arrow structures + ctypedef struct ArrowSchema: + const char* format + const char* name + const char* metadata + int64_t flags + int64_t n_children + ArrowSchema** children + ArrowSchema* dictionary + void (*release)(ArrowSchema*) + void* private_data + + ctypedef struct ArrowArray: + int64_t length + int64_t null_count + int64_t offset + int64_t n_buffers + int64_t n_children + const void** buffers + ArrowArray** children + ArrowArray* dictionary + void (*release)(ArrowArray*) + void* private_data + + # Arrow batch reading function + ErrorCode tsfile_result_set_get_next_tsblock_as_arrow(ResultSet result_set, + ArrowArray* out_array, + ArrowSchema* out_schema); + cdef extern from "./common/config/config.h" namespace "common": diff --git a/python/tsfile/tsfile_py_cpp.pxd b/python/tsfile/tsfile_py_cpp.pxd index 2389aa9a6..cdc4f5c62 100644 --- a/python/tsfile/tsfile_py_cpp.pxd +++ b/python/tsfile/tsfile_py_cpp.pxd @@ -52,6 +52,8 @@ cdef public api ResultSet tsfile_reader_query_table_c(TsFileReader reader, objec int64_t start_time, int64_t end_time) cdef public api ResultSet tsfile_reader_query_table_on_tree_c(TsFileReader reader, object column_list, int64_t start_time, int64_t end_time) +cdef public api ResultSet tsfile_reader_query_table_batch_c(TsFileReader reader, object table_name, object column_list, + int64_t start_time, int64_t end_time, int batch_size) cdef public api ResultSet tsfile_reader_query_paths_c(TsFileReader reader, object device_name, object sensor_list, int64_t start_time, int64_t end_time) cdef public api object get_table_schema(TsFileReader reader, object table_name) diff --git a/python/tsfile/tsfile_py_cpp.pyx b/python/tsfile/tsfile_py_cpp.pyx index 3ca79a2a1..374a56eb7 100644 --- a/python/tsfile/tsfile_py_cpp.pyx +++ b/python/tsfile/tsfile_py_cpp.pyx @@ -749,6 +749,33 @@ cdef ResultSet tsfile_reader_query_table_on_tree_c(TsFileReader reader, object c free(<void *> columns) columns = NULL +cdef ResultSet tsfile_reader_query_table_batch_c(TsFileReader reader, object table_name, object column_list, + int64_t start_time, int64_t end_time, int batch_size): + cdef ResultSet result + cdef int column_num = len(column_list) + cdef bytes table_name_bytes = PyUnicode_AsUTF8String(table_name) + cdef const char * table_name_c = table_name_bytes + cdef char** columns = <char**> malloc(sizeof(char *) * column_num) + cdef int i + cdef ErrorCode code = 0 + if columns == NULL: + raise MemoryError("Failed to allocate memory for columns") + try: + for i in range(column_num): + columns[i] = strdup((<str> column_list[i]).encode('utf-8')) + if columns[i] == NULL: + raise MemoryError("Failed to allocate memory for column name") + result = tsfile_query_table_batch(reader, table_name_c, columns, column_num, start_time, end_time, batch_size, &code) + check_error(code) + return result + finally: + if columns != NULL: + for i in range(column_num): + free(<void *> columns[i]) + columns[i] = NULL + free(<void *> columns) + columns = NULL + cdef ResultSet tsfile_reader_query_paths_c(TsFileReader reader, object device_name, object sensor_list, int64_t start_time, int64_t end_time): diff --git a/python/tsfile/tsfile_reader.pyx b/python/tsfile/tsfile_reader.pyx index 4476d24dc..935de9181 100644 --- a/python/tsfile/tsfile_reader.pyx +++ b/python/tsfile/tsfile_reader.pyx @@ -25,6 +25,9 @@ import pandas as pd from libc.stdint cimport INT64_MIN, INT64_MAX from libc.string cimport strlen from cpython.bytes cimport PyBytes_FromStringAndSize +from libc.string cimport memset +import pyarrow as pa +from libc.stdint cimport INT64_MIN, INT64_MAX, uintptr_t from tsfile.schema import TSDataType as TSDataTypePy from .date_utils import parse_int_to_date @@ -143,6 +146,38 @@ cdef class ResultSetPy: df = df.astype(data_type_dict) return df + def read_arrow_batch(self): + self.check_result_set_invalid() + + cdef ArrowArray arrow_array + cdef ArrowSchema arrow_schema + cdef ErrorCode code = 0 + cdef ErrorCode err_code = 0 + + memset(&arrow_array, 0, sizeof(ArrowArray)) + memset(&arrow_schema, 0, sizeof(ArrowSchema)) + + code = tsfile_result_set_get_next_tsblock_as_arrow(self.result, &arrow_array, &arrow_schema) + + if code != 0: + return None + + if arrow_schema.release == NULL or arrow_array.release == NULL: + return None + + try: + schema_ptr = <uintptr_t>&arrow_schema + array_ptr = <uintptr_t>&arrow_array + batch = pa.RecordBatch._import_from_c(array_ptr, schema_ptr) + table = pa.Table.from_batches([batch]) + return table + except Exception as e: + if arrow_array.release != NULL: + arrow_array.release(&arrow_array) + if arrow_schema.release != NULL: + arrow_schema.release(&arrow_schema) + raise e + def get_value_by_index(self, index : int): """ Get value by index from query result set. @@ -294,6 +329,18 @@ cdef class TsFileReaderPy: self.activate_result_set_list.add(pyresult) return pyresult + def query_table_batch(self, table_name : str, column_names : List[str], + start_time : int = INT64_MIN, end_time : int = INT64_MAX, + batch_size : int = 1024) -> ResultSetPy: + cdef ResultSet result; + result = tsfile_reader_query_table_batch_c(self.reader, table_name.lower(), + [column_name.lower() for column_name in column_names], + start_time, end_time, batch_size) + pyresult = ResultSetPy(self) + pyresult.init_c(result, table_name) + self.activate_result_set_list.add(pyresult) + return pyresult + def query_table_on_tree(self, column_names : List[str], start_time : int = INT64_MIN, end_time : int = INT64_MAX) -> ResultSetPy: """ diff --git a/python/tsfile/utils.py b/python/tsfile/utils.py index 723707bd1..1d9a89975 100644 --- a/python/tsfile/utils.py +++ b/python/tsfile/utils.py @@ -21,6 +21,7 @@ from typing import Optional import numpy as np import pandas as pd from pandas.core.dtypes.common import is_integer_dtype, is_object_dtype +from pandas.core.interchange.dataframe_protocol import DataFrame from tsfile import ColumnSchema, TableSchema, ColumnCategory, TSDataType, TIME_COLUMN from tsfile.exceptions import TableNotExistError, ColumnNotExistError
