This is an automated email from the ASF dual-hosted git repository.
colinlee pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/tsfile.git
The following commit(s) were added to refs/heads/develop by this push:
new 8da85750 Fix inability to read files exceeding int32 size limit (#726)
8da85750 is described below
commit 8da85750b64a8fdca8b8cf43c2c2acc31ef4e88e
Author: Colin Lee <[email protected]>
AuthorDate: Tue Feb 24 18:24:28 2026 +0800
Fix inability to read files exceeding int32 size limit (#726)
---
cpp/examples/c_examples/demo_read.c | 9 +++--
cpp/src/encoding/float_sprintz_decoder.h | 10 ++---
cpp/src/encoding/gorilla_decoder.h | 2 +-
cpp/src/encoding/int32_sprintz_decoder.h | 4 +-
cpp/src/encoding/int64_sprintz_decoder.h | 2 +-
cpp/src/encoding/ts2diff_decoder.h | 16 ++++----
cpp/src/file/read_file.cc | 8 ++--
cpp/src/file/read_file.h | 8 ++--
cpp/src/file/tsfile_io_reader.cc | 24 ++++++------
cpp/src/file/tsfile_io_reader.h | 4 +-
cpp/src/reader/chunk_reader.cc | 2 +-
cpp/src/reader/device_meta_iterator.cc | 8 ++--
cpp/src/reader/filter/tag_filter.cc | 2 +-
cpp/src/reader/filter/tag_filter.h | 3 +-
cpp/src/writer/chunk_writer.cc | 2 +-
cpp/src/writer/chunk_writer.h | 5 +--
cpp/test/reader/tsfile_reader_test.cc | 65 +++++++++++++++++++++++++++++++-
17 files changed, 119 insertions(+), 55 deletions(-)
diff --git a/cpp/examples/c_examples/demo_read.c
b/cpp/examples/c_examples/demo_read.c
index 05cc8620..5ac6111e 100644
--- a/cpp/examples/c_examples/demo_read.c
+++ b/cpp/examples/c_examples/demo_read.c
@@ -55,7 +55,7 @@ ERRNO read_tsfile() {
// Timestamp at column 1 and column index begin from 1.
Timestamp timestamp =
tsfile_result_set_get_value_by_index_int64_t(ret, 1);
- printf("%ld\n", timestamp);
+ printf("%lld\n", (long long)timestamp);
for (int i = 1; i <= column_num; i++) {
if (tsfile_result_set_is_null_by_index(ret, i)) {
printf(" null ");
@@ -72,9 +72,10 @@ ERRNO read_tsfile() {
i));
break;
case TS_DATATYPE_INT64:
- printf("%ld\n",
-
tsfile_result_set_get_value_by_index_int64_t(ret,
-
i));
+ printf("%lld\n",
+ (long long)
+
tsfile_result_set_get_value_by_index_int64_t(
+ ret, i));
break;
case TS_DATATYPE_FLOAT:
printf(
diff --git a/cpp/src/encoding/float_sprintz_decoder.h
b/cpp/src/encoding/float_sprintz_decoder.h
index 55801734..e3842eaa 100644
--- a/cpp/src/encoding/float_sprintz_decoder.h
+++ b/cpp/src/encoding/float_sprintz_decoder.h
@@ -56,20 +56,20 @@ class FloatSprintzDecoder : public SprintzDecoder {
predict_scheme_ = method;
}
- int read_boolean(bool& ret_value, common::ByteStream& in) {
+ int read_boolean(bool& ret_value, common::ByteStream& in) override {
return common::E_TYPE_NOT_MATCH;
}
- int read_int32(int32_t& ret_value, common::ByteStream& in) {
+ int read_int32(int32_t& ret_value, common::ByteStream& in) override {
return common::E_TYPE_NOT_MATCH;
}
- int read_int64(int64_t& ret_value, common::ByteStream& in) {
+ int read_int64(int64_t& ret_value, common::ByteStream& in) override {
return common::E_TYPE_NOT_MATCH;
}
- int read_double(double& ret_value, common::ByteStream& in) {
+ int read_double(double& ret_value, common::ByteStream& in) override {
return common::E_TYPE_NOT_MATCH;
}
int read_String(common::String& ret_value, common::PageArena& pa,
- common::ByteStream& in) {
+ common::ByteStream& in) override {
return common::E_TYPE_NOT_MATCH;
}
diff --git a/cpp/src/encoding/gorilla_decoder.h
b/cpp/src/encoding/gorilla_decoder.h
index 4429f2b5..5684561a 100644
--- a/cpp/src/encoding/gorilla_decoder.h
+++ b/cpp/src/encoding/gorilla_decoder.h
@@ -37,7 +37,7 @@ class GorillaDecoder : public Decoder {
~GorillaDecoder() override = default;
- void reset() {
+ void reset() override {
type_ = common::GORILLA;
stored_value_ = 0;
stored_leading_zeros_ = INT32_MAX;
diff --git a/cpp/src/encoding/int32_sprintz_decoder.h
b/cpp/src/encoding/int32_sprintz_decoder.h
index df9a7c8f..3d15597e 100644
--- a/cpp/src/encoding/int32_sprintz_decoder.h
+++ b/cpp/src/encoding/int32_sprintz_decoder.h
@@ -55,7 +55,7 @@ class Int32SprintzDecoder : public SprintzDecoder {
predict_scheme_ = method;
}
- bool has_remaining(const common::ByteStream& in) {
+ bool has_remaining(const common::ByteStream& in) override {
uint32_t min_len = sizeof(int32_t) + 1;
return (is_block_read_ && current_count_ < block_size_) ||
in.remaining_size() >= min_len;
@@ -79,7 +79,7 @@ class Int32SprintzDecoder : public SprintzDecoder {
return common::E_TYPE_NOT_MATCH;
}
- int read_int32(int32_t& ret_value, common::ByteStream& in) {
+ int read_int32(int32_t& ret_value, common::ByteStream& in) override {
int ret = common::E_OK;
if (!is_block_read_) {
if (RET_FAIL(decode_block(in))) {
diff --git a/cpp/src/encoding/int64_sprintz_decoder.h
b/cpp/src/encoding/int64_sprintz_decoder.h
index 263ce769..a7e3fdd2 100644
--- a/cpp/src/encoding/int64_sprintz_decoder.h
+++ b/cpp/src/encoding/int64_sprintz_decoder.h
@@ -61,7 +61,7 @@ class Int64SprintzDecoder : public SprintzDecoder {
std::fill(current_buffer_.begin(), current_buffer_.end(), 0);
}
- bool has_remaining(const common::ByteStream& in) {
+ bool has_remaining(const common::ByteStream& in) override {
return (is_block_read_ && current_count_ < block_size_) ||
in.has_remaining();
}
diff --git a/cpp/src/encoding/ts2diff_decoder.h
b/cpp/src/encoding/ts2diff_decoder.h
index c43dcf9d..32584546 100644
--- a/cpp/src/encoding/ts2diff_decoder.h
+++ b/cpp/src/encoding/ts2diff_decoder.h
@@ -36,7 +36,7 @@ class TS2DIFFDecoder : public Decoder {
TS2DIFFDecoder() { reset(); }
~TS2DIFFDecoder() override {}
- void reset() {
+ void reset() override {
write_index_ = -1;
bits_left_ = 0;
stored_value_ = 0;
@@ -48,7 +48,7 @@ class TS2DIFFDecoder : public Decoder {
current_index_ = 0;
}
- FORCE_INLINE bool has_remaining(const common::ByteStream& buffer) {
+ FORCE_INLINE bool has_remaining(const common::ByteStream& buffer) override
{
if (buffer.has_remaining()) return true;
return bits_left_ != 0 || (current_index_ <= write_index_ &&
write_index_ != -1 && current_index_ != 0);
@@ -98,13 +98,13 @@ class TS2DIFFDecoder : public Decoder {
}
T decode(common::ByteStream& in);
- int read_boolean(bool& ret_value, common::ByteStream& in);
- int read_int32(int32_t& ret_value, common::ByteStream& in);
- int read_int64(int64_t& ret_value, common::ByteStream& in);
- int read_float(float& ret_value, common::ByteStream& in);
- int read_double(double& ret_value, common::ByteStream& in);
+ int read_boolean(bool& ret_value, common::ByteStream& in) override;
+ int read_int32(int32_t& ret_value, common::ByteStream& in) override;
+ int read_int64(int64_t& ret_value, common::ByteStream& in) override;
+ int read_float(float& ret_value, common::ByteStream& in) override;
+ int read_double(double& ret_value, common::ByteStream& in) override;
int read_String(common::String& ret_value, common::PageArena& pa,
- common::ByteStream& in);
+ common::ByteStream& in) override;
public:
T first_value_;
diff --git a/cpp/src/file/read_file.cc b/cpp/src/file/read_file.cc
index 6fa9b809..1807883a 100644
--- a/cpp/src/file/read_file.cc
+++ b/cpp/src/file/read_file.cc
@@ -65,14 +65,14 @@ int ReadFile::open(const std::string& file_path) {
return ret;
}
-int ReadFile::get_file_size(int32_t& file_size) {
+int ReadFile::get_file_size(int64_t& file_size) {
struct stat s;
if (fstat(fd_, &s) < 0) {
LOGE("fstat error, file_path=" << file_path_.c_str() << "fd=" << fd_
<< "errno" << errno);
return E_FILE_STAT_ERR;
}
- file_size = s.st_size;
+ file_size = static_cast<int64_t>(s.st_size);
return E_OK;
}
@@ -109,13 +109,13 @@ int ReadFile::check_file_magic() {
return ret;
}
-int ReadFile::read(int32_t offset, char* buf, int32_t buf_size,
+int ReadFile::read(int64_t offset, char* buf, int32_t buf_size,
int32_t& read_len) {
int ret = E_OK;
read_len = 0;
while (read_len < buf_size) {
ssize_t pread_size = ::pread(fd_, buf + read_len, buf_size - read_len,
- offset + read_len);
+ static_cast<off_t>(offset + read_len));
if (pread_size < 0) {
ret = E_FILE_READ_ERR;
////log_err("tsfile reader error, file_path=%s, errno=%d",
diff --git a/cpp/src/file/read_file.h b/cpp/src/file/read_file.h
index 64fe25f5..c3894009 100644
--- a/cpp/src/file/read_file.h
+++ b/cpp/src/file/read_file.h
@@ -37,19 +37,19 @@ class ReadFile {
int open(const std::string& file_path);
FORCE_INLINE bool is_opened() const { return fd_ > 0; }
- FORCE_INLINE int32_t file_size() const { return file_size_; }
+ FORCE_INLINE int64_t file_size() const { return file_size_; }
FORCE_INLINE const std::string& file_path() const { return file_path_; }
/*
* try to reader @buf_size bytes from @offset of this file
* into @buf. @read_len return the actual len reader.
*/
- int read(int32_t offset, char* buf, int32_t buf_size,
+ int read(int64_t offset, char* buf, int32_t buf_size,
int32_t& ret_read_len);
void close();
private:
- int get_file_size(int32_t& file_size);
+ int get_file_size(int64_t& file_size);
int check_file_magic();
private:
@@ -59,7 +59,7 @@ class ReadFile {
private:
std::string file_path_;
int fd_;
- int32_t file_size_;
+ int64_t file_size_;
};
} // end namespace storage
diff --git a/cpp/src/file/tsfile_io_reader.cc b/cpp/src/file/tsfile_io_reader.cc
index e16b6b4a..405c5553 100644
--- a/cpp/src/file/tsfile_io_reader.cc
+++ b/cpp/src/file/tsfile_io_reader.cc
@@ -135,18 +135,20 @@ int TsFileIOReader::load_tsfile_meta() {
int ret = E_OK;
uint32_t tsfile_meta_size = 0;
- int32_t read_offset = 0;
+ int64_t read_offset = 0;
int32_t ret_read_len = 0;
// Step 1: reader the tsfile_meta_size
// 1.1 prepare reader buffer
- int32_t alloc_size = UTIL_MIN(TSFILE_READ_IO_SIZE, file_size());
+ const int64_t fsize = file_size();
+ const int32_t alloc_size = static_cast<int32_t>(
+ UTIL_MIN(static_cast<int64_t>(TSFILE_READ_IO_SIZE), fsize));
char* read_buf = (char*)mem_alloc(alloc_size, MOD_TSFILE_READER);
if (IS_NULL(read_buf)) {
return E_OOM;
}
// 1.2 reader data from file
- read_offset = file_size() - alloc_size;
+ read_offset = fsize - alloc_size;
ret_read_len = 0;
if (RET_FAIL(read_file_->read(read_offset, read_buf, alloc_size,
ret_read_len))) {
@@ -177,7 +179,7 @@ int TsFileIOReader::load_tsfile_meta() {
read_buf = old_read_buf;
ret = E_OOM;
} else if (RET_FAIL(read_file_->read(
- file_size() - tsfile_meta_size -
+ fsize - tsfile_meta_size -
TAIL_MAGIC_AND_META_SIZE_SIZE,
read_buf, tsfile_meta_size, ret_read_len))) {
} else if (tsfile_meta_size != (uint32_t)ret_read_len) {
@@ -226,8 +228,8 @@ int TsFileIOReader::load_timeseries_index_for_ssi(
}
auto& pa = ssi->timeseries_index_pa_;
- int start_offset = device_index_entry->get_offset(),
- end_offset = device_ie_end_offset;
+ int64_t start_offset = device_index_entry->get_offset(),
+ end_offset = device_ie_end_offset;
ASSERT(start_offset < end_offset);
const int32_t read_size = end_offset - start_offset;
int32_t ret_read_len = 0;
@@ -387,8 +389,8 @@ int TsFileIOReader::load_all_measurement_index_entry(
return ret;
}
-int TsFileIOReader::read_device_meta_index(int32_t start_offset,
- int32_t end_offset,
+int TsFileIOReader::read_device_meta_index(int64_t start_offset,
+ int64_t end_offset,
common::PageArena& pa,
MetaIndexNode*& device_meta_index,
bool leaf) {
@@ -428,8 +430,8 @@ int TsFileIOReader::get_timeseries_indexes(
return ret;
}
- int start_offset = device_index_entry->get_offset(),
- end_offset = device_ie_end_offset;
+ int64_t start_offset = device_index_entry->get_offset(),
+ end_offset = device_ie_end_offset;
ASSERT(start_offset < end_offset);
const int32_t read_size = end_offset - start_offset;
int32_t ret_read_len = 0;
@@ -577,7 +579,7 @@ int TsFileIOReader::get_time_column_metadata(
return ret;
}
char* ti_buf = nullptr;
- int start_idx = 0, end_idx = 0;
+ int64_t start_idx = 0, end_idx = 0;
int ret_read_len = 0;
if (measurement_node->node_type_ == LEAF_MEASUREMENT) {
ByteStream buffer;
diff --git a/cpp/src/file/tsfile_io_reader.h b/cpp/src/file/tsfile_io_reader.h
index a62ccc14..19bcfea0 100644
--- a/cpp/src/file/tsfile_io_reader.h
+++ b/cpp/src/file/tsfile_io_reader.h
@@ -75,7 +75,7 @@ class TsFileIOReader {
int get_chunk_metadata_list(IDeviceID device_id, std::string measurement,
std::vector<ChunkMeta*>& chunk_meta_list);
- int read_device_meta_index(int32_t start_offset, int32_t end_offset,
+ int read_device_meta_index(int64_t start_offset, int64_t end_offset,
common::PageArena& pa,
MetaIndexNode*& device_meta_index, bool leaf);
int get_timeseries_indexes(
@@ -85,7 +85,7 @@ class TsFileIOReader {
common::PageArena& pa);
private:
- FORCE_INLINE int32_t file_size() const { return read_file_->file_size(); }
+ FORCE_INLINE int64_t file_size() const { return read_file_->file_size(); }
int load_tsfile_meta();
diff --git a/cpp/src/reader/chunk_reader.cc b/cpp/src/reader/chunk_reader.cc
index 91de1e14..1b3160b7 100644
--- a/cpp/src/reader/chunk_reader.cc
+++ b/cpp/src/reader/chunk_reader.cc
@@ -228,7 +228,7 @@ int ChunkReader::read_from_file_and_rewrap(int want_size) {
int ret = E_OK;
const int DEFAULT_READ_SIZE = 4096; // may use page_size +
page_header_size
char* file_data_buf = in_stream_.get_wrapped_buf();
- int offset = chunk_meta_->offset_of_chunk_header_ + chunk_visit_offset_;
+ int64_t offset = chunk_meta_->offset_of_chunk_header_ +
chunk_visit_offset_;
int read_size =
(want_size < DEFAULT_READ_SIZE ? DEFAULT_READ_SIZE : want_size);
if (file_data_buf_size_ < read_size ||
diff --git a/cpp/src/reader/device_meta_iterator.cc
b/cpp/src/reader/device_meta_iterator.cc
index be740852..b1fc939f 100644
--- a/cpp/src/reader/device_meta_iterator.cc
+++ b/cpp/src/reader/device_meta_iterator.cc
@@ -79,8 +79,8 @@ int DeviceMetaIterator::load_leaf_device(MetaIndexNode*
meta_index_node) {
continue;
}
}
- int32_t start_offset = child->get_offset();
- int32_t end_offset = i + 1 < leaf_children.size()
+ int64_t start_offset = child->get_offset();
+ int64_t end_offset = i + 1 < leaf_children.size()
? leaf_children[i + 1]->get_offset()
: meta_index_node->end_offset_;
MetaIndexNode* child_node = nullptr;
@@ -104,8 +104,8 @@ int DeviceMetaIterator::load_internal_node(MetaIndexNode*
meta_index_node) {
for (size_t i = 0; i < internal_children.size(); i++) {
std::shared_ptr<IMetaIndexEntry> child = internal_children[i];
- int32_t start_offset = child->get_offset();
- int32_t end_offset = (i + 1 < internal_children.size())
+ int64_t start_offset = child->get_offset();
+ int64_t end_offset = (i + 1 < internal_children.size())
? internal_children[i + 1]->get_offset()
: meta_index_node->end_offset_;
diff --git a/cpp/src/reader/filter/tag_filter.cc
b/cpp/src/reader/filter/tag_filter.cc
index c4c56a1d..f92c9ef8 100644
--- a/cpp/src/reader/filter/tag_filter.cc
+++ b/cpp/src/reader/filter/tag_filter.cc
@@ -26,7 +26,7 @@ namespace storage {
// TagFilter base class implementation
TagFilter::TagFilter(int col_idx, std::string tag_value)
- : col_idx_(col_idx), value_(std::move(tag_value)), value2_("") {}
+ : value_(std::move(tag_value)), value2_(""), col_idx_(col_idx) {}
TagFilter::~TagFilter() = default;
diff --git a/cpp/src/reader/filter/tag_filter.h
b/cpp/src/reader/filter/tag_filter.h
index c7d8843f..b858be8c 100644
--- a/cpp/src/reader/filter/tag_filter.h
+++ b/cpp/src/reader/filter/tag_filter.h
@@ -35,7 +35,8 @@ class TagFilter : public Filter {
TagFilter(int col_idx, std::string tag_value);
~TagFilter() override;
- virtual bool satisfyRow(int time, std::vector<std::string*> segments)
const;
+ bool satisfyRow(int time,
+ std::vector<std::string*> segments) const override;
virtual bool satisfyRow(std::vector<std::string*> segments) const;
std::string value_;
diff --git a/cpp/src/writer/chunk_writer.cc b/cpp/src/writer/chunk_writer.cc
index d9f9603d..da181133 100644
--- a/cpp/src/writer/chunk_writer.cc
+++ b/cpp/src/writer/chunk_writer.cc
@@ -160,7 +160,7 @@ int ChunkWriter::end_encode_chunk() {
chunk_header_.data_size_ = chunk_data_.total_size();
chunk_header_.num_of_pages_ = num_of_pages_;
}
- } else if (first_page_statistic_ != nullptr) {
+ } else if (first_page_statistic_->count_ != 0) {
ret = write_first_page_data(chunk_data_, false);
if (E_OK == ret) {
free_first_writer_data();
diff --git a/cpp/src/writer/chunk_writer.h b/cpp/src/writer/chunk_writer.h
index fc1a7113..3032ff9a 100644
--- a/cpp/src/writer/chunk_writer.h
+++ b/cpp/src/writer/chunk_writer.h
@@ -139,10 +139,7 @@ class ChunkWriter {
FORCE_INLINE void free_first_writer_data() {
// free memory
first_page_data_.destroy();
- if (first_page_statistic_ != nullptr) {
- StatisticFactory::free(first_page_statistic_);
- first_page_statistic_ = nullptr;
- }
+ first_page_statistic_->reset();
}
int seal_cur_page(bool end_chunk);
void save_first_page_data(PageWriter& first_page_writer);
diff --git a/cpp/test/reader/tsfile_reader_test.cc
b/cpp/test/reader/tsfile_reader_test.cc
index 6d4edd5a..b426d7ec 100644
--- a/cpp/test/reader/tsfile_reader_test.cc
+++ b/cpp/test/reader/tsfile_reader_test.cc
@@ -19,6 +19,7 @@
#include "reader/tsfile_reader.h"
#include <gtest/gtest.h>
+#include <sys/stat.h>
#include <random>
#include <vector>
@@ -52,7 +53,7 @@ class TsFileReaderTest : public ::testing::Test {
void TearDown() override {
delete tsfile_writer_;
- remove(file_name_.c_str());
+ // remove(file_name_.c_str());
libtsfile_destroy();
}
@@ -237,3 +238,65 @@ TEST_F(TsFileReaderTest, GetTimeseriesSchema) {
ASSERT_EQ(measurement_schemas[1].data_type_, TSDataType::INT32);
reader.close();
}
+
+static const int64_t kLargeFileNumRecords = 300000000;
+static const int64_t kLargeFileFlushBatch = 100000;
+
+TEST_F(TsFileReaderTest,
+ DISABLED_LargeFileNoEncodingNoCompression_WriteAndRead) {
+ std::string device_path = "device1";
+ std::string measurement_name = "temperature";
+ common::TSDataType data_type = common::TSDataType::INT64;
+ common::TSEncoding encoding = common::TSEncoding::PLAIN;
+ common::CompressionType compression_type =
+ common::CompressionType::UNCOMPRESSED;
+
+ tsfile_writer_->register_timeseries(
+ device_path, storage::MeasurementSchema(measurement_name, data_type,
+ encoding, compression_type));
+
+ const int64_t start_time = 1622505600000LL;
+ for (int64_t i = 0; i < kLargeFileNumRecords; ++i) {
+ TsRecord record(start_time + i * 1000, device_path);
+ record.add_point(measurement_name, static_cast<int64_t>(i));
+ ASSERT_EQ(tsfile_writer_->write_record(record), E_OK);
+ if ((i + 1) % kLargeFileFlushBatch == 0) {
+ ASSERT_EQ(tsfile_writer_->flush(), E_OK);
+ }
+ }
+ ASSERT_EQ(tsfile_writer_->flush(), E_OK);
+ ASSERT_EQ(tsfile_writer_->close(), E_OK);
+
+ std::vector<std::string> select_list = {"device1.temperature"};
+ const int64_t end_time = start_time + (kLargeFileNumRecords - 1) * 1000 +
1;
+
+ storage::TsFileReader reader;
+ int ret = reader.open(file_name_);
+ ASSERT_EQ(ret, common::E_OK);
+
+ storage::ResultSet* tmp_qds = nullptr;
+ ret = reader.query(select_list, start_time, end_time, tmp_qds);
+ ASSERT_EQ(ret, common::E_OK);
+ ASSERT_NE(tmp_qds, nullptr);
+
+ auto* qds = static_cast<QDSWithoutTimeGenerator*>(tmp_qds);
+ std::shared_ptr<ResultSetMetadata> meta = qds->get_metadata();
+ ASSERT_NE(meta, nullptr);
+ ASSERT_EQ(meta->get_column_type(1), INT64);
+ ASSERT_EQ(meta->get_column_type(2), INT64);
+
+ int64_t row_count = 0;
+ bool has_next = false;
+
+ while (true) {
+ ret = qds->next(has_next);
+ ASSERT_EQ(ret, common::E_OK);
+ if (!has_next) break;
+ row_count++;
+ }
+
+ ASSERT_EQ(row_count, kLargeFileNumRecords);
+
+ reader.destroy_query_data_set(qds);
+ reader.close();
+}