This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/tsfile.git
The following commit(s) were added to refs/heads/develop by this push:
new b171c5edb Feature/get-sensor-statistic-and-restorable-writer (#731)
b171c5edb is described below
commit b171c5edb6bd58c79347256f9eeed8d157921d5a
Author: Hongzhi Gao <[email protected]>
AuthorDate: Wed Mar 11 10:36:23 2026 +0800
Feature/get-sensor-statistic-and-restorable-writer (#731)
* Implement interface get_all_timeseries_metadata for Retrieve metadata for
all timeseries in the file
* mvn spotless apply
* Implement RestorableTsFileIOWriter
* Support continued writing to restored files in the tree model & table
model interfaces.
* fix readme logo
* fix readme logo
* fix readme badge
* fix recovery tsfile statistic
* fix recovery tsfile append and reader
* Refactor RestorableTsFileIOWriterTest
* Refactor get_timeseries_metadata
* Refactor get_timeseries_metadata & restorable_tsfile_io_writer.cc
* mvn spotless apply
* fix mem leak and overflow warning
* mvn spotless:apply
* try fix ci
* try fix ci
* try fix ci
* removed the replay loop and added a recovery API to restore the logical
write_stream position directly from recovered file size
* spotless apply
* fix restore_recovered_file_position
* fix TsFileWriter::init(RestorableTsFileIOWriter* rw)
* fix RestorableTsFileIOWriter ut
---
.github/workflows/unit-test-cpp.yml | 5 +
.github/workflows/unit-test-python.yml | 5 +
cpp/src/common/allocator/byte_stream.h | 21 +
cpp/src/common/device_id.h | 5 +-
cpp/src/common/schema.h | 59 +-
cpp/src/common/tsfile_common.h | 8 +
cpp/src/encoding/fire.h | 18 +-
cpp/src/encoding/ts2diff_encoder.h | 2 +-
cpp/src/file/restorable_tsfile_io_writer.cc | 845 +++++++++++++++++++++
cpp/src/file/restorable_tsfile_io_writer.h | 132 ++++
cpp/src/file/tsfile_io_writer.cc | 41 +-
cpp/src/file/tsfile_io_writer.h | 26 +-
cpp/src/file/write_file.cc | 51 +-
cpp/src/file/write_file.h | 7 +
cpp/src/reader/tsfile_reader.cc | 55 +-
cpp/src/reader/tsfile_reader.h | 31 +
cpp/src/reader/tsfile_tree_reader.cc | 15 +-
cpp/src/reader/tsfile_tree_reader.h | 31 +-
cpp/src/writer/tsfile_table_writer.cc | 26 +
cpp/src/writer/tsfile_table_writer.h | 14 +
cpp/src/writer/tsfile_tree_writer.cc | 12 +
cpp/src/writer/tsfile_tree_writer.h | 14 +
cpp/src/writer/tsfile_writer.cc | 86 ++-
cpp/src/writer/tsfile_writer.h | 3 +
cpp/test/CMakeLists.txt | 16 +-
cpp/test/file/restorable_tsfile_io_writer_test.cc | 497 ++++++++++++
cpp/test/file/write_file_test.cc | 29 +
.../reader/tree_view/tsfile_reader_tree_test.cc | 17 +-
cpp/test/reader/tsfile_reader_test.cc | 27 +-
cpp/test/writer/tsfile_writer_test.cc | 5 -
30 files changed, 2056 insertions(+), 47 deletions(-)
diff --git a/.github/workflows/unit-test-cpp.yml
b/.github/workflows/unit-test-cpp.yml
index e7a3f1069..1c3495ea6 100644
--- a/.github/workflows/unit-test-cpp.yml
+++ b/.github/workflows/unit-test-cpp.yml
@@ -121,6 +121,8 @@ jobs:
fi
# Run the actual maven build including all tests.
+ # On Windows, prepend MinGW bin to PATH so test exe can find runtime DLLs
+ # (e.g. libstdc++-6.dll) when gtest_discover_tests runs it; avoids
0xc0000139.
- name: Build and test with Maven
shell: bash
run: |
@@ -129,6 +131,9 @@ jobs:
else
ASAN_VALUE="OFF"
fi
+ if [[ "$RUNNER_OS" == "Windows" ]]; then
+ export PATH="/c/mingw64/bin:$PATH"
+ fi
./mvnw${{ steps.platform_suffix.outputs.platform_suffix }} -P
with-cpp \
-Denable.asan=$ASAN_VALUE -Dbuild.type=${{ matrix.build_type }}
clean verify
diff --git a/.github/workflows/unit-test-python.yml
b/.github/workflows/unit-test-python.yml
index 28a6d07ca..708f8ec35 100644
--- a/.github/workflows/unit-test-python.yml
+++ b/.github/workflows/unit-test-python.yml
@@ -96,6 +96,11 @@ jobs:
- name: Build and test with Maven
shell: bash
run: |
+ # On Windows, prepend MinGW bin so CTest/test exe uses the correct
+ # runtime DLLs instead of Git-for-Windows mingw runtime from PATH.
+ if [[ "$RUNNER_OS" == "Windows" ]]; then
+ export PATH="/c/mingw64/bin:$PATH"
+ fi
./mvnw${{ steps.platform_suffix.outputs.platform_suffix }} -P
with-python -Denable.asan=OFF -Dbuild.type=Release clean verify
-Dspotless.skip=true
- name: Upload whl Artifact
diff --git a/cpp/src/common/allocator/byte_stream.h
b/cpp/src/common/allocator/byte_stream.h
index 570aa1c13..4e1029ea4 100644
--- a/cpp/src/common/allocator/byte_stream.h
+++ b/cpp/src/common/allocator/byte_stream.h
@@ -459,6 +459,27 @@ class ByteStream {
total_size_.atomic_aaf(used_bytes);
}
+ /**
+ * Advance write position without copying payload bytes.
+ * Recovery path can use this to rebuild logical stream offset from file
+ * size directly.
+ */
+ int advance_write_pos(uint32_t len) {
+ int ret = common::E_OK;
+ uint32_t advanced = 0;
+ while (advanced < len) {
+ if (RET_FAIL(prepare_space())) {
+ return ret;
+ }
+ uint32_t remainder = page_size_ - (total_size_.load() %
page_size_);
+ uint32_t step =
+ remainder < (len - advanced) ? remainder : (len - advanced);
+ total_size_.atomic_aaf(step);
+ advanced += step;
+ }
+ return ret;
+ }
+
/* ================ Part 4: reading internal buffers ================ */
/*
* one-shot reader iterator
diff --git a/cpp/src/common/device_id.h b/cpp/src/common/device_id.h
index 50d0d0105..323df9d47 100644
--- a/cpp/src/common/device_id.h
+++ b/cpp/src/common/device_id.h
@@ -148,8 +148,9 @@ class StringArrayDeviceID : public IDeviceID {
if (prefix_segments_.size() == 0 || prefix_segments_.size() == 1) {
return segments_[pos];
} else {
- if (pos < prefix_segments_.size()) {
- return prefix_segments_[pos];
+ if (pos >= 0 &&
+ static_cast<size_t>(pos) < prefix_segments_.size()) {
+ return prefix_segments_[static_cast<size_t>(pos)];
} else {
return segments_[pos - prefix_segments_.size() + 1];
}
diff --git a/cpp/src/common/schema.h b/cpp/src/common/schema.h
index 499dd5bc7..a2c989af2 100644
--- a/cpp/src/common/schema.h
+++ b/cpp/src/common/schema.h
@@ -182,7 +182,7 @@ struct MeasurementSchemaGroup {
*/
class TableSchema {
public:
- TableSchema() = default;
+ TableSchema() : updatable_(true) {}
/**
* Constructs a TableSchema object with the given table name, column
@@ -197,7 +197,7 @@ class TableSchema {
*/
TableSchema(const std::string& table_name,
const std::vector<common::ColumnSchema>& column_schemas)
- : table_name_(table_name) {
+ : table_name_(table_name), updatable_(false) {
to_lowercase_inplace(table_name_);
for (const common::ColumnSchema& column_schema : column_schemas) {
column_schemas_.emplace_back(std::make_shared<MeasurementSchema>(
@@ -217,7 +217,9 @@ class TableSchema {
TableSchema(const std::string& table_name,
const std::vector<MeasurementSchema*>& column_schemas,
const std::vector<common::ColumnCategory>& column_categories)
- : table_name_(table_name), column_categories_(column_categories) {
+ : table_name_(table_name),
+ column_categories_(column_categories),
+ updatable_(false) {
to_lowercase_inplace(table_name_);
for (const auto column_schema : column_schemas) {
if (column_schema != nullptr) {
@@ -236,11 +238,13 @@ class TableSchema {
TableSchema(TableSchema&& other) noexcept
: table_name_(std::move(other.table_name_)),
column_schemas_(std::move(other.column_schemas_)),
- column_categories_(std::move(other.column_categories_)) {}
+ column_categories_(std::move(other.column_categories_)),
+ updatable_(other.updatable_) {}
TableSchema(const TableSchema& other) noexcept
: table_name_(other.table_name_),
- column_categories_(other.column_categories_) {
+ column_categories_(other.column_categories_),
+ updatable_(false) {
for (const auto& column_schema : other.column_schemas_) {
// Just call default construction
column_schemas_.emplace_back(
@@ -342,6 +346,14 @@ class TableSchema {
size_t get_column_pos_index_num() const { return column_pos_index_.size();
}
void update(ChunkGroupMeta* chunk_group_meta) {
+ if (!updatable_) {
+ return;
+ }
+ std::shared_ptr<IDeviceID> device_id = chunk_group_meta->device_id_;
+ const int seg_num = device_id ? device_id->segment_num() : 0;
+ if (seg_num > max_level_) {
+ max_level_ = seg_num;
+ }
for (auto iter = chunk_group_meta->chunk_meta_list_.begin();
iter != chunk_group_meta->chunk_meta_list_.end(); iter++) {
auto& chunk_meta = iter.get();
@@ -371,6 +383,29 @@ class TableSchema {
}
}
+ void finalize_column_schema() {
+ if (!updatable_) {
+ return;
+ }
+ std::vector<std::shared_ptr<MeasurementSchema>> id_columns;
+ for (int i = 1; i < max_level_; i++) {
+ std::string col_name = "__level" + std::to_string(i);
+ id_columns.push_back(std::make_shared<MeasurementSchema>(
+ col_name, common::STRING, common::PLAIN,
+ common::CompressionType::UNCOMPRESSED));
+ }
+ column_schemas_.insert(column_schemas_.begin(), id_columns.begin(),
+ id_columns.end());
+ column_categories_.insert(column_categories_.begin(),
id_columns.size(),
+ common::ColumnCategory::TAG);
+ column_pos_index_.clear();
+ for (size_t i = 0; i < column_schemas_.size(); i++) {
+ column_pos_index_[to_lower(column_schemas_[i]->measurement_name_)]
=
+ static_cast<int>(i);
+ }
+ updatable_ = false;
+ }
+
std::vector<common::TSDataType> get_data_types() const {
std::vector<common::TSDataType> ret;
for (const auto& measurement_schema : column_schemas_) {
@@ -424,6 +459,8 @@ class TableSchema {
std::vector<common::ColumnCategory> column_categories_;
std::map<std::string, int> column_pos_index_;
bool is_virtual_table_ = false;
+ int max_level_ = 0;
+ bool updatable_ = false;
};
struct Schema {
@@ -433,11 +470,19 @@ struct Schema {
void update_table_schema(ChunkGroupMeta* chunk_group_meta) {
std::shared_ptr<IDeviceID> device_id = chunk_group_meta->device_id_;
- auto table_name = device_id->get_table_name();
+ std::string table_name = device_id->get_table_name();
if (table_schema_map_.find(table_name) == table_schema_map_.end()) {
table_schema_map_[table_name] = std::make_shared<TableSchema>();
}
- table_schema_map_[table_name]->update(chunk_group_meta);
+ auto& ts = table_schema_map_[table_name];
+ ts->set_table_name(table_name);
+ ts->update(chunk_group_meta);
+ }
+
+ void finalize_table_schemas() {
+ for (auto& kv : table_schema_map_) {
+ kv.second->finalize_column_schema();
+ }
}
void register_table_schema(
const std::shared_ptr<TableSchema>& table_schema) {
diff --git a/cpp/src/common/tsfile_common.h b/cpp/src/common/tsfile_common.h
index d12c6ed8c..ad2fa5911 100644
--- a/cpp/src/common/tsfile_common.h
+++ b/cpp/src/common/tsfile_common.h
@@ -23,9 +23,11 @@
#include <cstring>
#include <iostream>
#include <map>
+#include <memory>
#include <string>
#include <unordered_map>
#include <utility>
+#include <vector>
#include "common/allocator/my_string.h"
#include "common/allocator/page_arena.h"
@@ -322,6 +324,12 @@ class ITimeseriesIndex {
virtual Statistic* get_statistic() const { return nullptr; }
};
+/** Map: IDeviceID -> list of timeseries metadata (ITimeseriesIndex). */
+using DeviceTimeseriesMetadataMap =
+ std::map<std::shared_ptr<IDeviceID>,
+ std::vector<std::shared_ptr<ITimeseriesIndex>>,
+ IDeviceIDComparator>;
+
/*
* A TimeseriesIndex may have one or more chunk metas,
* that means we have such a map: <Timeseries, List<ChunkMeta>>.
diff --git a/cpp/src/encoding/fire.h b/cpp/src/encoding/fire.h
index 9b319a175..0ca3d1680 100644
--- a/cpp/src/encoding/fire.h
+++ b/cpp/src/encoding/fire.h
@@ -92,7 +92,7 @@ class LongFire : public Fire<int64_t> {
int64_t predict(int64_t value) override {
int64_t alpha = accumulator_ >> learn_shift_;
- int64_t diff = (alpha * delta_) >> bit_width_;
+ int64_t diff = safe_mul_shift(alpha, delta_, bit_width_);
return value + diff;
}
@@ -101,6 +101,22 @@ class LongFire : public Fire<int64_t> {
accumulator_ -= gradient;
delta_ = val - pre;
}
+
+ private:
+ /** (alpha * delta_) >> shift without signed overflow; both args are
+ * int64_t. */
+ static int64_t safe_mul_shift(int64_t alpha, int64_t delta, int shift) {
+#if defined(__SIZEOF_INT128__) && __SIZEOF_INT128__ >= 16
+ __int128 product = static_cast<__int128>(alpha) * delta;
+ return static_cast<int64_t>(product >> shift);
+#else
+ /* Portable fallback: use double for product. Exact for |alpha|,|delta|
+ * < 2^53. */
+ double prod = static_cast<double>(alpha) * static_cast<double>(delta);
+ double div = static_cast<double>(1LL << shift);
+ return static_cast<int64_t>(prod / div);
+#endif
+ }
};
#endif // ENCODING_FIRE_H
diff --git a/cpp/src/encoding/ts2diff_encoder.h
b/cpp/src/encoding/ts2diff_encoder.h
index 18272e3e2..8c5ddafc7 100644
--- a/cpp/src/encoding/ts2diff_encoder.h
+++ b/cpp/src/encoding/ts2diff_encoder.h
@@ -93,7 +93,7 @@ class TS2DIFFEncoder : public Encoder {
public:
TS2DIFFEncoder() { init(); }
- ~TS2DIFFEncoder() {}
+ ~TS2DIFFEncoder() { destroy(); }
void reset() { write_index_ = -1; }
diff --git a/cpp/src/file/restorable_tsfile_io_writer.cc
b/cpp/src/file/restorable_tsfile_io_writer.cc
new file mode 100644
index 000000000..d98cdff65
--- /dev/null
+++ b/cpp/src/file/restorable_tsfile_io_writer.cc
@@ -0,0 +1,845 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "file/restorable_tsfile_io_writer.h"
+
+#include <fcntl.h>
+
+#include <algorithm>
+#include <cstring>
+#include <memory>
+#include <vector>
+
+#include "common/allocator/byte_stream.h"
+#include "common/device_id.h"
+#include "common/statistic.h"
+#include "common/tsfile_common.h"
+#include "compress/compressor_factory.h"
+#include "encoding/decoder_factory.h"
+#include "utils/errno_define.h"
+
+#ifdef _WIN32
+#include <io.h>
+#include <sys/stat.h>
+#include <windows.h>
+ssize_t pread(int fd, void* buf, size_t count, uint64_t offset);
+#else
+#include <sys/stat.h>
+#include <unistd.h>
+#endif
+
+using namespace common;
+
+namespace storage {
+
+namespace {
+
+const int HEADER_LEN = MAGIC_STRING_TSFILE_LEN + 1; // magic + version
+const int BUF_SIZE = 4096;
+const unsigned char kTimeChunkTypeMask = 0x80;
+
+//
-----------------------------------------------------------------------------
+// Self-check helpers: read file, parse chunk header, recover chunk statistics
+//
-----------------------------------------------------------------------------
+
+/**
+ * Lightweight read-only file handle for self-check only.
+ * Use init_from_fd() when WriteFile is already open to avoid opening the file
+ * twice (fixes Windows file sharing and ensures we read the same content).
+ */
+struct SelfCheckReader {
+ int fd_;
+ int32_t file_size_;
+ bool own_fd_; // if false, do not close fd_
+
+ SelfCheckReader() : fd_(-1), file_size_(-1), own_fd_(true) {}
+
+ int init_from_fd(int fd) {
+ fd_ = fd;
+ own_fd_ = false;
+ if (fd_ < 0) {
+ return E_FILE_OPEN_ERR;
+ }
+#ifdef _WIN32
+ struct _stat st;
+ if (_fstat(fd_, &st) < 0) {
+ return E_FILE_STAT_ERR;
+ }
+ file_size_ = static_cast<int32_t>(st.st_size);
+#else
+ struct stat st;
+ if (fstat(fd_, &st) < 0) {
+ return E_FILE_STAT_ERR;
+ }
+ file_size_ = static_cast<int32_t>(st.st_size);
+#endif
+ return E_OK;
+ }
+
+ int open(const std::string& path) {
+#ifdef _WIN32
+ fd_ = ::_open(path.c_str(), _O_RDONLY | _O_BINARY);
+#else
+ fd_ = ::open(path.c_str(), O_RDONLY);
+#endif
+ if (fd_ < 0) {
+ return E_FILE_OPEN_ERR;
+ }
+ own_fd_ = true;
+#ifdef _WIN32
+ struct _stat st;
+ if (_fstat(fd_, &st) < 0) {
+ close();
+ return E_FILE_STAT_ERR;
+ }
+ file_size_ = static_cast<int32_t>(st.st_size);
+#else
+ struct stat st;
+ if (fstat(fd_, &st) < 0) {
+ close();
+ return E_FILE_STAT_ERR;
+ }
+ file_size_ = static_cast<int32_t>(st.st_size);
+#endif
+ return E_OK;
+ }
+
+ void close() {
+ if (own_fd_ && fd_ >= 0) {
+#ifdef _WIN32
+ ::_close(fd_);
+#else
+ ::close(fd_);
+#endif
+ }
+ fd_ = -1;
+ file_size_ = -1;
+ }
+
+ int32_t file_size() const { return file_size_; }
+
+ int read(int32_t offset, char* buf, int32_t buf_size, int32_t& read_len) {
+ read_len = 0;
+ if (fd_ < 0) {
+ return E_FILE_READ_ERR;
+ }
+ ssize_t n = ::pread(fd_, buf, buf_size, offset);
+ if (n < 0) {
+ return E_FILE_READ_ERR;
+ }
+ read_len = static_cast<int32_t>(n);
+ return E_OK;
+ }
+};
+
+#ifdef _WIN32
+ssize_t pread(int fd, void* buf, size_t count, uint64_t offset) {
+ DWORD read_bytes = 0;
+ OVERLAPPED ov = {};
+ ov.OffsetHigh = (DWORD)((offset >> 32) & 0xFFFFFFFF);
+ ov.Offset = (DWORD)(offset & 0xFFFFFFFF);
+ HANDLE h = (HANDLE)_get_osfhandle(fd);
+ if (!ReadFile(h, buf, (DWORD)count, &read_bytes, &ov)) {
+ if (GetLastError() != ERROR_HANDLE_EOF) {
+ return -1;
+ }
+ }
+ return (ssize_t)read_bytes;
+}
+#endif
+
+/**
+ * Parse chunk header at chunk_start and compute total chunk size (header +
+ * data). Does not read full chunk data; used to advance scan position.
+ * @param header_out If non-null, filled with the deserialized chunk header.
+ * @param bytes_consumed Set to header_len + data_size on success.
+ */
+static int parse_chunk_header_and_skip(SelfCheckReader& reader,
+ int64_t chunk_start,
+ int64_t& bytes_consumed,
+ ChunkHeader* header_out = nullptr) {
+ int32_t file_size = reader.file_size();
+ int32_t max_read = static_cast<int32_t>(
+ std::min(static_cast<int64_t>(BUF_SIZE), file_size - chunk_start));
+ if (max_read < ChunkHeader::MIN_SERIALIZED_SIZE) {
+ return E_TSFILE_CORRUPTED;
+ }
+
+ std::vector<char> buf(max_read);
+ int32_t read_len = 0;
+ int ret = reader.read(static_cast<int32_t>(chunk_start), buf.data(),
+ max_read, read_len);
+ if (ret != E_OK || read_len < ChunkHeader::MIN_SERIALIZED_SIZE) {
+ return E_TSFILE_CORRUPTED;
+ }
+
+ ByteStream bs;
+ bs.wrap_from(buf.data(), read_len);
+
+ ChunkHeader header;
+ ret = header.deserialize_from(bs);
+ if (ret != E_OK) {
+ return E_TSFILE_CORRUPTED;
+ }
+
+ int header_len = bs.read_pos();
+ int64_t total = header_len + header.data_size_;
+ if (chunk_start + total > file_size) {
+ return E_TSFILE_CORRUPTED;
+ }
+
+ if (header_out != nullptr) {
+ *header_out = header;
+ }
+ bytes_consumed = total;
+ return E_OK;
+}
+
+/**
+ * Recover chunk-level statistic from chunk data so that tail metadata can be
+ * generated correctly after recovery (aligned with Java TsFileSequenceReader
+ * selfCheck). Multi-page: merge each page header's statistic. Single-page:
+ * decode page data and update stat. For aligned value chunks, time_batch
+ * (from the time chunk in the same group) must be provided.
+ */
+static int recover_chunk_statistic(
+ const ChunkHeader& chdr, const char* chunk_data, int32_t data_size,
+ Statistic* out_stat, common::PageArena* pa,
+ const std::vector<int64_t>* time_batch = nullptr,
+ std::vector<int64_t>* out_time_batch = nullptr) {
+ if (chunk_data == nullptr || data_size <= 0 || out_stat == nullptr) {
+ return E_OK;
+ }
+ common::ByteStream bs;
+ bs.wrap_from(const_cast<char*>(chunk_data),
+ static_cast<uint32_t>(data_size));
+ // Multi-page chunk: high bits of chunk_type_ are 0x00, low 6 bits =
+ // CHUNK_HEADER_MARKER
+ const bool multi_page =
+ (static_cast<unsigned char>(chdr.chunk_type_) & 0x3F) ==
+ static_cast<unsigned char>(CHUNK_HEADER_MARKER);
+
+ if (multi_page) {
+ while (bs.remaining_size() > 0) {
+ PageHeader ph;
+ int ret = ph.deserialize_from(bs, true, chdr.data_type_);
+ if (ret != common::E_OK) {
+ return ret;
+ }
+ uint32_t comp = ph.compressed_size_;
+ if (ph.statistic_ != nullptr) {
+ if (out_stat->merge_with(ph.statistic_) != common::E_OK) {
+ ph.reset();
+ return common::E_TSFILE_CORRUPTED;
+ }
+ }
+ ph.reset();
+ bs.wrapped_buf_advance_read_pos(comp);
+ }
+ return E_OK;
+ }
+
+ // Single-page chunk: statistic is not in page header; decompress and
decode
+ // to fill out_stat. is_time_column: bit 0x80 in chunk_type_ indicates time
+ // column (aligned model).
+ const bool is_time_column = (static_cast<unsigned char>(chdr.chunk_type_) &
+ kTimeChunkTypeMask) != 0;
+ PageHeader ph;
+ int ret = ph.deserialize_from(bs, false, chdr.data_type_);
+ if (ret != common::E_OK || ph.compressed_size_ == 0 ||
+ bs.remaining_size() < ph.compressed_size_) {
+ // Align with Java selfCheck behavior: malformed/incomplete page in
this
+ // chunk is treated as corrupted data.
+ return common::E_TSFILE_CORRUPTED;
+ }
+ const char* compressed_ptr =
+ chunk_data + (data_size - static_cast<int32_t>(bs.remaining_size()));
+ char* uncompressed_buf = nullptr;
+ uint32_t uncompressed_size = 0;
+ Compressor* compressor =
+ CompressorFactory::alloc_compressor(chdr.compression_type_);
+ if (compressor == nullptr) {
+ return common::E_OOM;
+ }
+ ret = compressor->reset(false);
+ if (ret != common::E_OK) {
+ CompressorFactory::free(compressor);
+ return ret;
+ }
+ ret = compressor->uncompress(const_cast<char*>(compressed_ptr),
+ ph.compressed_size_, uncompressed_buf,
+ uncompressed_size);
+ if (ret != common::E_OK || uncompressed_buf == nullptr ||
+ uncompressed_size != ph.uncompressed_size_) {
+ if (uncompressed_buf != nullptr) {
+ compressor->after_uncompress(uncompressed_buf);
+ }
+ CompressorFactory::free(compressor);
+ return (ret == common::E_OK) ? common::E_TSFILE_CORRUPTED : ret;
+ }
+ if (is_time_column) {
+ /* Time chunk: uncompressed = raw time stream only (no var_uint). */
+ Decoder* time_decoder = DecoderFactory::alloc_time_decoder();
+ if (time_decoder == nullptr) {
+ compressor->after_uncompress(uncompressed_buf);
+ CompressorFactory::free(compressor);
+ return common::E_OOM;
+ }
+ common::ByteStream time_in;
+ time_in.wrap_from(uncompressed_buf, uncompressed_size);
+ time_decoder->reset();
+ int64_t t;
+ if (out_time_batch != nullptr) {
+ out_time_batch->clear();
+ }
+ while (time_decoder->has_remaining(time_in)) {
+ if (time_decoder->read_int64(t, time_in) != common::E_OK) {
+ break;
+ }
+ out_stat->update(t);
+ if (out_time_batch != nullptr) {
+ out_time_batch->push_back(t);
+ }
+ }
+ DecoderFactory::free(time_decoder);
+ compressor->after_uncompress(uncompressed_buf);
+ CompressorFactory::free(compressor);
+ return E_OK;
+ }
+
+ /* Value chunk: parse layout and decode. */
+ const char* value_buf = nullptr;
+ uint32_t value_buf_size = 0;
+ std::vector<int64_t> time_decode_buf;
+ const std::vector<int64_t>* times = nullptr;
+
+ if (time_batch != nullptr && !time_batch->empty()) {
+ // Aligned value page: uncompressed layout = uint32(num_values) +
bitmap
+ // + value_buf
+ if (uncompressed_size < 4) {
+ compressor->after_uncompress(uncompressed_buf);
+ CompressorFactory::free(compressor);
+ return E_OK;
+ }
+ uint32_t num_values =
+ (static_cast<uint32_t>(
+ static_cast<unsigned char>(uncompressed_buf[0]))
+ << 24) |
+ (static_cast<uint32_t>(
+ static_cast<unsigned char>(uncompressed_buf[1]))
+ << 16) |
+ (static_cast<uint32_t>(
+ static_cast<unsigned char>(uncompressed_buf[2]))
+ << 8) |
+ (static_cast<uint32_t>(
+ static_cast<unsigned char>(uncompressed_buf[3])));
+ uint32_t bitmap_size = (num_values + 7) / 8;
+ if (uncompressed_size < 4 + bitmap_size) {
+ compressor->after_uncompress(uncompressed_buf);
+ CompressorFactory::free(compressor);
+ return E_OK;
+ }
+ value_buf = uncompressed_buf + 4 + bitmap_size;
+ value_buf_size = uncompressed_size - 4 - bitmap_size;
+ times = time_batch;
+ } else {
+ // Non-aligned value page: var_uint(time_buf_size) + time_buf +
+ // value_buf
+ int var_size = 0;
+ uint32_t time_buf_size = 0;
+ ret = common::SerializationUtil::read_var_uint(
+ time_buf_size, uncompressed_buf,
+ static_cast<int>(uncompressed_size), &var_size);
+ if (ret != common::E_OK ||
+ static_cast<uint32_t>(var_size) + time_buf_size >
+ uncompressed_size) {
+ compressor->after_uncompress(uncompressed_buf);
+ CompressorFactory::free(compressor);
+ return (ret == common::E_OK) ? common::E_TSFILE_CORRUPTED : ret;
+ }
+ const char* time_buf = uncompressed_buf + var_size;
+ value_buf = time_buf + time_buf_size;
+ value_buf_size =
+ uncompressed_size - static_cast<uint32_t>(var_size) -
time_buf_size;
+ Decoder* time_decoder = DecoderFactory::alloc_time_decoder();
+ if (time_decoder == nullptr) {
+ compressor->after_uncompress(uncompressed_buf);
+ CompressorFactory::free(compressor);
+ return common::E_OOM;
+ }
+ common::ByteStream time_in;
+ time_in.wrap_from(const_cast<char*>(time_buf), time_buf_size);
+ time_decoder->reset();
+ time_decode_buf.clear();
+ int64_t t;
+ while (time_decoder->has_remaining(time_in)) {
+ if (time_decoder->read_int64(t, time_in) != common::E_OK) {
+ break;
+ }
+ time_decode_buf.push_back(t);
+ }
+ DecoderFactory::free(time_decoder);
+ times = &time_decode_buf;
+ }
+
+ Decoder* value_decoder = DecoderFactory::alloc_value_decoder(
+ chdr.encoding_type_, chdr.data_type_);
+ if (value_decoder == nullptr) {
+ compressor->after_uncompress(uncompressed_buf);
+ CompressorFactory::free(compressor);
+ return common::E_OOM;
+ }
+ common::ByteStream value_in;
+ value_in.wrap_from(const_cast<char*>(value_buf), value_buf_size);
+ value_decoder->reset();
+ size_t idx = 0;
+ const size_t num_times = times->size();
+ while (idx < num_times && value_decoder->has_remaining(value_in)) {
+ int64_t t = (*times)[idx];
+ switch (chdr.data_type_) {
+ case common::BOOLEAN: {
+ bool v;
+ if (value_decoder->read_boolean(v, value_in) == common::E_OK) {
+ out_stat->update(t, v);
+ }
+ break;
+ }
+ case common::INT32:
+ case common::DATE: {
+ int32_t v;
+ if (value_decoder->read_int32(v, value_in) == common::E_OK) {
+ out_stat->update(t, v);
+ }
+ break;
+ }
+ case common::INT64:
+ case common::TIMESTAMP: {
+ int64_t v;
+ if (value_decoder->read_int64(v, value_in) == common::E_OK) {
+ out_stat->update(t, v);
+ }
+ break;
+ }
+ case common::FLOAT: {
+ float v;
+ if (value_decoder->read_float(v, value_in) == common::E_OK) {
+ out_stat->update(t, v);
+ }
+ break;
+ }
+ case common::DOUBLE: {
+ double v;
+ if (value_decoder->read_double(v, value_in) == common::E_OK) {
+ out_stat->update(t, v);
+ }
+ break;
+ }
+ case common::TEXT:
+ case common::BLOB:
+ case common::STRING: {
+ common::String v;
+ if (pa != nullptr && value_decoder->read_String(
+ v, *pa, value_in) == common::E_OK) {
+ out_stat->update(t, v);
+ }
+ break;
+ }
+ default:
+ break;
+ }
+ idx++;
+ }
+ DecoderFactory::free(value_decoder);
+ compressor->after_uncompress(uncompressed_buf);
+ CompressorFactory::free(compressor);
+ return E_OK;
+}
+
+} // namespace
+
+RestorableTsFileIOWriter::RestorableTsFileIOWriter()
+ : TsFileIOWriter(),
+ write_file_(nullptr),
+ write_file_owned_(false),
+ truncated_size_(-1),
+ crashed_(false),
+ can_write_(false) {
+ self_check_arena_.init(512, MOD_TSFILE_READER);
+}
+
+RestorableTsFileIOWriter::~RestorableTsFileIOWriter() { close(); }
+
+void RestorableTsFileIOWriter::close() {
+ if (write_file_owned_ && write_file_ != nullptr) {
+ write_file_->close();
+ delete write_file_;
+ write_file_ = nullptr;
+ write_file_owned_ = false;
+ }
+ for (ChunkGroupMeta* cgm : self_check_recovered_cgm_) {
+ cgm->device_id_.reset();
+ }
+ self_check_recovered_cgm_.clear();
+ self_check_arena_.destroy();
+}
+
+int RestorableTsFileIOWriter::open(const std::string& file_path,
+ bool truncate_corrupted) {
+ if (write_file_ != nullptr) {
+ return E_ALREADY_EXIST;
+ }
+
+ file_path_ = file_path;
+ write_file_ = new WriteFile();
+ write_file_owned_ = true;
+
+ // O_RDWR|O_CREAT without O_TRUNC: preserve existing file content
+#ifdef _WIN32
+ const int flags = O_RDWR | O_CREAT | O_BINARY;
+#else
+ const int flags = O_RDWR | O_CREAT;
+#endif
+ const mode_t mode = 0644;
+
+ int ret = write_file_->create(file_path_, flags, mode);
+ if (ret != E_OK) {
+ close();
+ return ret;
+ }
+
+ ret = self_check(truncate_corrupted);
+ if (ret != E_OK) {
+ close();
+ return ret;
+ }
+
+ return E_OK;
+}
+
+int RestorableTsFileIOWriter::self_check(bool truncate_corrupted) {
+ SelfCheckReader reader;
+ // Use a separate read-only handle for self-check: on Windows, sharing the
+ // O_RDWR fd can cause stale/cached reads when detecting a complete file.
+ int ret = reader.open(file_path_);
+ if (ret != E_OK) {
+ return ret;
+ }
+
+ int32_t file_size = reader.file_size();
+
+ // --- Empty file: treat as crashed, allow writing from scratch ---
+ if (file_size == 0) {
+ reader.close();
+ truncated_size_ = 0;
+ crashed_ = true;
+ can_write_ = true;
+ if (write_file_->seek_to_end() != E_OK) {
+ return E_FILE_READ_ERR;
+ }
+ ret = init(write_file_);
+ if (ret != E_OK) {
+ return ret;
+ }
+ ret = start_file();
+ if (ret != E_OK) {
+ return ret;
+ }
+ return E_OK;
+ }
+
+ // --- File too short or invalid header => not a valid TsFile ---
+ if (file_size < HEADER_LEN) {
+ reader.close();
+ truncated_size_ = TSFILE_CHECK_INCOMPATIBLE;
+ return E_TSFILE_CORRUPTED;
+ }
+
+ char header_buf[HEADER_LEN];
+ int32_t read_len = 0;
+ ret = reader.read(0, header_buf, HEADER_LEN, read_len);
+ if (ret != E_OK || read_len != HEADER_LEN) {
+ reader.close();
+ truncated_size_ = TSFILE_CHECK_INCOMPATIBLE;
+ return E_TSFILE_CORRUPTED;
+ }
+
+ if (memcmp(header_buf, MAGIC_STRING_TSFILE, MAGIC_STRING_TSFILE_LEN) != 0)
{
+ reader.close();
+ truncated_size_ = TSFILE_CHECK_INCOMPATIBLE;
+ return E_TSFILE_CORRUPTED;
+ }
+
+ if (header_buf[MAGIC_STRING_TSFILE_LEN] != VERSION_NUM_BYTE) {
+ reader.close();
+ truncated_size_ = TSFILE_CHECK_INCOMPATIBLE;
+ return E_TSFILE_CORRUPTED;
+ }
+
+ // --- Completeness check (aligned with Java isComplete()) ---
+ // Require size >= 2*magic + version_byte and tail magic same as head
magic.
+ bool is_complete = false;
+ if (file_size >= static_cast<int32_t>(MAGIC_STRING_TSFILE_LEN * 2 + 1)) {
+ char tail_buf[MAGIC_STRING_TSFILE_LEN];
+ ret = reader.read(file_size - MAGIC_STRING_TSFILE_LEN, tail_buf,
+ MAGIC_STRING_TSFILE_LEN, read_len);
+ if (ret == E_OK && read_len == MAGIC_STRING_TSFILE_LEN &&
+ memcmp(tail_buf, MAGIC_STRING_TSFILE, MAGIC_STRING_TSFILE_LEN) ==
+ 0) {
+ is_complete = true;
+ }
+ }
+
+ // --- File is complete: no recovery, close write handle and return ---
+ if (is_complete) {
+ reader.close();
+ truncated_size_ = TSFILE_CHECK_COMPLETE;
+ crashed_ = false;
+ can_write_ = false;
+ write_file_->close();
+ delete write_file_;
+ write_file_ = nullptr;
+ write_file_owned_ = false;
+ return E_OK;
+ }
+
+ // --- Recovery path: scan from header to find last valid truncation point
+ // ---
+ int64_t truncated = HEADER_LEN;
+ int64_t pos = HEADER_LEN;
+ std::vector<char> buf(BUF_SIZE);
+
+ // Recover schema and chunk group meta (aligned with Java selfCheck).
+ // cur_group_time_batch: timestamps decoded from time chunk, used by
aligned
+ // value chunks.
+ std::shared_ptr<IDeviceID> cur_device_id;
+ ChunkGroupMeta* cur_cgm = nullptr;
+ std::vector<ChunkGroupMeta*> recovered_cgm_list;
+ std::vector<int64_t> cur_group_time_batch;
+
+ auto flush_chunk_group = [this, &cur_device_id, &cur_cgm,
+ &recovered_cgm_list]() {
+ if (cur_cgm != nullptr && cur_device_id != nullptr) {
+ get_schema()->update_table_schema(cur_cgm);
+ recovered_cgm_list.push_back(cur_cgm);
+ self_check_recovered_cgm_.push_back(cur_cgm);
+ cur_cgm = nullptr;
+ }
+ };
+
+ while (pos < file_size) {
+ unsigned char marker;
+ ret = reader.read(static_cast<int32_t>(pos),
+ reinterpret_cast<char*>(&marker), 1, read_len);
+ if (ret != E_OK || read_len != 1) {
+ break;
+ }
+ pos += 1;
+
+ if (marker == static_cast<unsigned char>(SEPARATOR_MARKER)) {
+ truncated = pos - 1;
+ flush_chunk_group();
+ break;
+ }
+
+ if (marker == static_cast<unsigned char>(CHUNK_GROUP_HEADER_MARKER)) {
+ truncated = pos - 1;
+ flush_chunk_group();
+ cur_group_time_batch.clear();
+ int seg_len = 0;
+ ret = reader.read(static_cast<int32_t>(pos), buf.data(), BUF_SIZE,
+ read_len);
+ if (ret != E_OK || read_len < 1) {
+ break;
+ }
+ ByteStream bs;
+ bs.wrap_from(buf.data(), read_len);
+ cur_device_id = std::make_shared<StringArrayDeviceID>("init");
+ ret = cur_device_id->deserialize(bs);
+ if (ret != E_OK) {
+ break;
+ }
+ seg_len = bs.read_pos();
+ pos += seg_len;
+ cur_cgm = new (self_check_arena_.alloc(sizeof(ChunkGroupMeta)))
+ ChunkGroupMeta(&self_check_arena_);
+ cur_cgm->init(cur_device_id);
+ continue;
+ }
+
+ if (marker == static_cast<unsigned char>(OPERATION_INDEX_RANGE)) {
+ truncated = pos - 1;
+ flush_chunk_group();
+ cur_device_id.reset();
+ if (pos + 2 * 8 > static_cast<int64_t>(file_size)) {
+ break;
+ }
+ char range_buf[16];
+ ret =
+ reader.read(static_cast<int32_t>(pos), range_buf, 16,
read_len);
+ if (ret != E_OK || read_len != 16) {
+ break;
+ }
+ pos += 16;
+ truncated = pos;
+ continue;
+ }
+
+ if (marker == static_cast<unsigned char>(CHUNK_HEADER_MARKER) ||
+ marker ==
+ static_cast<unsigned char>(ONLY_ONE_PAGE_CHUNK_HEADER_MARKER)
||
+ (marker & 0x3F) ==
+ static_cast<unsigned char>(CHUNK_HEADER_MARKER) ||
+ (marker & 0x3F) ==
+ static_cast<unsigned char>(ONLY_ONE_PAGE_CHUNK_HEADER_MARKER))
{
+ int64_t chunk_start = pos - 1;
+ int64_t consumed = 0;
+ ChunkHeader chdr;
+ ret = parse_chunk_header_and_skip(reader, chunk_start, consumed,
+ &chdr);
+ if (ret != E_OK) {
+ break;
+ }
+ pos = chunk_start + consumed;
+ truncated = pos;
+ if (cur_cgm != nullptr) {
+ void* cm_buf = self_check_arena_.alloc(sizeof(ChunkMeta));
+ if (IS_NULL(cm_buf)) {
+ ret = common::E_OOM;
+ break;
+ }
+ auto* cm = new (cm_buf) ChunkMeta();
+ common::String mname;
+ mname.dup_from(chdr.measurement_name_, self_check_arena_);
+ Statistic* stat = StatisticFactory::alloc_statistic_with_pa(
+ static_cast<common::TSDataType>(chdr.data_type_),
+ &self_check_arena_);
+ if (IS_NULL(stat)) {
+ ret = common::E_OOM;
+ break;
+ }
+ stat->reset();
+ if (chdr.data_size_ > 0) {
+ const int32_t header_len =
+ static_cast<int32_t>(consumed) - chdr.data_size_;
+ if (header_len > 0 && chunk_start + consumed <=
+ static_cast<int64_t>(file_size))
{
+ std::vector<char> chunk_data(chdr.data_size_);
+ int32_t read_len = 0;
+ ret = reader.read(
+ static_cast<int32_t>(chunk_start + header_len),
+ chunk_data.data(), chdr.data_size_, read_len);
+ if (ret == E_OK && read_len == chdr.data_size_) {
+ ret = recover_chunk_statistic(
+ chdr, chunk_data.data(), chdr.data_size_, stat,
+ &self_check_arena_, &cur_group_time_batch,
+ &cur_group_time_batch);
+ }
+ if (ret != E_OK) {
+ break;
+ }
+ }
+ }
+ cm->init(mname,
+ static_cast<common::TSDataType>(chdr.data_type_),
+ chunk_start, stat, 0,
+ static_cast<common::TSEncoding>(chdr.encoding_type_),
+ static_cast<common::CompressionType>(
+ chdr.compression_type_),
+ self_check_arena_);
+ cur_cgm->push(cm);
+ if (cur_device_id != nullptr &&
+ (static_cast<unsigned char>(chdr.chunk_type_) &
+ kTimeChunkTypeMask) != 0) {
+ // For aligned series, a time chunk implies this device
+ // uses aligned layout. Record it so recovered writer state
+ // can keep alignment behavior consistent.
+ aligned_devices_.insert(cur_device_id->get_table_name());
+ }
+ }
+ continue;
+ }
+
+ truncated_size_ = TSFILE_CHECK_INCOMPATIBLE;
+ flush_chunk_group();
+ reader.close();
+ return E_TSFILE_CORRUPTED;
+ }
+
+ flush_chunk_group();
+ get_schema()->finalize_table_schemas();
+ reader.close();
+ truncated_size_ = truncated;
+
+ // --- Optionally truncate file to last valid offset ---
+ if (truncate_corrupted && truncated < static_cast<int64_t>(file_size)) {
+ ret = write_file_->truncate(truncated);
+ if (ret != E_OK) {
+ return ret;
+ }
+ }
+
+ if (write_file_->seek_to_end() != E_OK) {
+ return E_FILE_READ_ERR;
+ }
+
+ crashed_ = true;
+ can_write_ = true;
+
+ ret = init(write_file_);
+ if (ret != E_OK) {
+ return ret;
+ }
+
+ // --- Restore write_stream_ logical position from existing file size ---
+ const int64_t restored_size = write_file_->get_position();
+ if (restored_size > 0) {
+ ret = restore_recovered_file_position(restored_size);
+ if (ret != E_OK) {
+ return ret;
+ }
+ }
+
+ // --- Attach recovered ChunkGroupMeta to writer; destroy() will not free
+ // them ---
+ for (ChunkGroupMeta* cgm : recovered_cgm_list) {
+ push_chunk_group_meta(cgm);
+ }
+ chunk_group_meta_from_recovery_ = true;
+
+ return E_OK;
+}
+
+bool RestorableTsFileIOWriter::is_device_aligned(
+ const std::string& device) const {
+ return aligned_devices_.find(device) != aligned_devices_.end();
+}
+
+TsFileIOWriter* RestorableTsFileIOWriter::get_tsfile_io_writer() {
+ return can_write_ ? this : nullptr;
+}
+
+WriteFile* RestorableTsFileIOWriter::get_write_file() {
+ return can_write_ ? write_file_ : nullptr;
+}
+
+std::string RestorableTsFileIOWriter::get_file_path() const {
+ return file_path_;
+}
+
+} // namespace storage
diff --git a/cpp/src/file/restorable_tsfile_io_writer.h
b/cpp/src/file/restorable_tsfile_io_writer.h
new file mode 100644
index 000000000..051bf7d8c
--- /dev/null
+++ b/cpp/src/file/restorable_tsfile_io_writer.h
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef FILE_RESTORABLE_TSFILE_IO_WRITER_H
+#define FILE_RESTORABLE_TSFILE_IO_WRITER_H
+
+#include <memory>
+#include <set>
+#include <string>
+#include <vector>
+
+#include "common/schema.h"
+#include "common/tsfile_common.h"
+#include "file/tsfile_io_writer.h"
+#include "file/write_file.h"
+
+namespace storage {
+
+/**
+ * TsFile check status constants for self-check result.
+ * COMPLETE_FILE (0): File is complete, no recovery needed.
+ * INCOMPATIBLE_FILE (-2): File is not in TsFile format.
+ */
+constexpr int64_t TSFILE_CHECK_COMPLETE = 0;
+constexpr int64_t TSFILE_CHECK_INCOMPATIBLE = -2;
+
+/**
+ * RestorableTsFileIOWriter opens and optionally recovers a TsFile.
+ * Inherits from TsFileIOWriter for continued writing after recovery.
+ *
+ * (1) If the TsFile is closed normally: has_crashed()=false, can_write()=false
+ *
+ * (2) If the TsFile is incomplete/crashed: has_crashed()=true,
+ * can_write()=true, the writer truncates corrupted data and allows continued
+ * writing.
+ *
+ * Uses standard C++11 and avoids memory leaks via RAII and smart pointers.
+ */
+class RestorableTsFileIOWriter : public TsFileIOWriter {
+ public:
+ RestorableTsFileIOWriter();
+ ~RestorableTsFileIOWriter();
+
+ // Non-copyable
+ RestorableTsFileIOWriter(const RestorableTsFileIOWriter&) = delete;
+ RestorableTsFileIOWriter& operator=(const RestorableTsFileIOWriter&) =
+ delete;
+
+ /**
+ * Open a TsFile for recovery/append.
+ * Uses O_RDWR|O_CREAT without O_TRUNC, so existing file content is
+ * preserved.
+ *
+ * @param file_path Path to the TsFile
+ * @param truncate_corrupted If true, truncate corrupted data. If false,
+ * do not truncate (incomplete file will remain as-is).
+ * @return E_OK on success, error code otherwise.
+ */
+ int open(const std::string& file_path, bool truncate_corrupted = true);
+
+ void close();
+
+ bool can_write() const { return can_write_; }
+ bool has_crashed() const { return crashed_; }
+ int64_t get_truncated_size() const { return truncated_size_; }
+ std::shared_ptr<Schema> get_known_schema() { return get_schema(); }
+
+ /** True if the device was recovered as aligned (has time column). */
+ bool is_device_aligned(const std::string& device) const;
+
+ /**
+ * Recovered chunk group metas from self_check (actual device_id and chunk
+ * metas from file). TsFileWriter::init() uses this to rebuild schemas_
+ * with the real device keys (aligned with Java). Valid until close().
+ */
+ const std::vector<ChunkGroupMeta*>& get_recovered_chunk_group_metas()
+ const {
+ return self_check_recovered_cgm_;
+ }
+
+ /**
+ * Get the TsFileIOWriter for continued writing. Only valid when
+ * can_write() is true. Returns this (since we inherit TsFileIOWriter).
+ */
+ TsFileIOWriter* get_tsfile_io_writer();
+
+ /**
+ * Get the WriteFile for TsFileWriter::init(). Only valid when can_write().
+ * Caller must not destroy the returned pointer.
+ */
+ WriteFile* get_write_file();
+
+ std::string get_file_path() const;
+
+ private:
+ int self_check(bool truncate_corrupted);
+
+ private:
+ std::string file_path_;
+ WriteFile* write_file_;
+ bool write_file_owned_;
+
+ int64_t truncated_size_;
+ bool crashed_;
+ bool can_write_;
+
+ std::set<std::string> aligned_devices_;
+ common::PageArena self_check_arena_;
+ /** ChunkGroupMeta* allocated from self_check_arena_; reset device_id
before
+ * arena destroy to avoid leak. */
+ std::vector<ChunkGroupMeta*> self_check_recovered_cgm_;
+};
+
+} // namespace storage
+
+#endif // FILE_RESTORABLE_TSFILE_IO_WRITER_H
diff --git a/cpp/src/file/tsfile_io_writer.cc b/cpp/src/file/tsfile_io_writer.cc
index 200ab84f9..2aba1e39c 100644
--- a/cpp/src/file/tsfile_io_writer.cc
+++ b/cpp/src/file/tsfile_io_writer.cc
@@ -50,17 +50,21 @@ int TsFileIOWriter::init(WriteFile* write_file) {
}
void TsFileIOWriter::destroy() {
- for (auto iter = chunk_group_meta_list_.begin();
- iter != chunk_group_meta_list_.end(); iter++) {
- if (iter.get() && iter.get()->device_id_) {
- iter.get()->device_id_.reset();
- }
- if (iter.get()) {
- for (auto chunk_meta = iter.get()->chunk_meta_list_.begin();
- chunk_meta != iter.get()->chunk_meta_list_.end();
- chunk_meta++) {
- if (chunk_meta.get()) {
- chunk_meta.get()->statistic_->destroy();
+ // When meta came from RestorableTsFileIOWriter recovery, entries live in
+ // an arena there; do not release device_id_/statistic_ here.
+ if (!chunk_group_meta_from_recovery_) {
+ for (auto iter = chunk_group_meta_list_.begin();
+ iter != chunk_group_meta_list_.end(); iter++) {
+ if (iter.get() && iter.get()->device_id_) {
+ iter.get()->device_id_.reset();
+ }
+ if (iter.get()) {
+ for (auto chunk_meta = iter.get()->chunk_meta_list_.begin();
+ chunk_meta != iter.get()->chunk_meta_list_.end();
+ chunk_meta++) {
+ if (chunk_meta.get()) {
+ chunk_meta.get()->statistic_->destroy();
+ }
}
}
}
@@ -812,6 +816,14 @@ int TsFileIOWriter::clone_node_list(
return ret;
}
+int TsFileIOWriter::restore_recovered_file_position(int64_t recovered_size) {
+ if (recovered_size < 0) {
+ return E_INVALID_ARG;
+ }
+ file_base_offset_ = recovered_size;
+ return E_OK;
+}
+
// #if DEBUG_SE
// void DEBUG_print_byte_stream_buf(const char *tag,
// const char *buf,
@@ -844,10 +856,9 @@ int TsFileIOWriter::flush_stream_to_file() {
write_stream_consumer_.get_next_buf(write_stream_);
if (b.buf_ == nullptr) {
break;
- } else {
- if (RET_FAIL(file_->write(b.buf_, b.len_))) {
- break;
- }
+ }
+ if (b.len_ > 0 && RET_FAIL(file_->write(b.buf_, b.len_))) {
+ break;
}
}
diff --git a/cpp/src/file/tsfile_io_writer.h b/cpp/src/file/tsfile_io_writer.h
index a7d0a1404..8fcc8fa55 100644
--- a/cpp/src/file/tsfile_io_writer.h
+++ b/cpp/src/file/tsfile_io_writer.h
@@ -117,7 +117,7 @@ class TsFileIOWriter {
int flush_stream_to_file();
int write_chunk_data(common::ByteStream& chunk_data);
FORCE_INLINE int64_t cur_file_position() const {
- return write_stream_.total_size();
+ return file_base_offset_ + write_stream_.total_size();
}
FORCE_INLINE int write_buf(const char* buf, uint32_t len) {
return write_stream_.write_buf(buf, len);
@@ -184,6 +184,25 @@ class TsFileIOWriter {
int init_bloom_filter(BloomFilter& filter);
int32_t get_path_count(common::SimpleList<ChunkGroupMeta*>& cgm_list);
+ // for open file
+ void add_ts_time_index_entry(TimeseriesIndex& ts_index);
+
+ protected:
+ /** For RestorableTsFileIOWriter: append a recovered ChunkGroupMeta. */
+ void push_chunk_group_meta(ChunkGroupMeta* cgm) {
+ chunk_group_meta_list_.push_back(cgm);
+ }
+ /** True when chunk_group_meta_list_ entries are from recovery arena;
+ * destroy() must not free them. */
+ bool chunk_group_meta_from_recovery_ = false;
+ /**
+ * Recovery only: set file_base_offset_ so that cur_file_position() returns
+ * correct absolute offsets. After recovery the writer behaves as if the
+ * file was just flushed — write_stream_ starts empty and only holds new
+ * data.
+ */
+ int restore_recovered_file_position(int64_t recovered_size);
+
private:
common::PageArena meta_allocator_;
common::ByteStream write_stream_;
@@ -202,6 +221,11 @@ class TsFileIOWriter {
std::string encrypt_type_;
std::string encrypt_key_;
bool is_aligned_;
+ /** Recovery only: absolute file offset at which write_stream_ logically
+ * begins. Normal (non-recovery) path keeps this at 0. */
+ int64_t file_base_offset_ = 0;
+
+ friend class RestorableTsFileIOWriter; // uses push_chunk_group_meta
};
} // end namespace storage
diff --git a/cpp/src/file/write_file.cc b/cpp/src/file/write_file.cc
index d8a17aa56..8ad96fab2 100644
--- a/cpp/src/file/write_file.cc
+++ b/cpp/src/file/write_file.cc
@@ -32,6 +32,7 @@
#include "utils/errno_define.h"
#ifdef _WIN32
+#include <io.h>
int fsync(int);
#endif
@@ -105,7 +106,13 @@ int WriteFile::sync() {
}
int WriteFile::close() {
- ASSERT(fd_ > 0);
+ // Idempotent: already closed is not an error
+ if (fd_ < 0) {
+#ifdef DEBUG_SE
+ std::cout << "file already closed, path=" << path_;
+#endif
+ return E_OK;
+ }
if (::close(fd_) < 0) {
#ifdef DEBUG_SE
std::cout << "failed to close " << path_ << " errorno " << errno
@@ -121,6 +128,48 @@ int WriteFile::close() {
return E_OK;
}
+int WriteFile::truncate(int64_t size) {
+ ASSERT(fd_ > 0);
+#ifdef _WIN32
+ if (_chsize_s(fd_, static_cast<long>(size)) != 0) {
+ return E_FILE_WRITE_ERR;
+ }
+#else
+ if (::ftruncate(fd_, static_cast<off_t>(size)) < 0) {
+ return E_FILE_WRITE_ERR;
+ }
+#endif
+ return E_OK;
+}
+
+int WriteFile::seek_to_end() {
+ ASSERT(fd_ > 0);
+#ifdef _WIN32
+ if (_lseeki64(fd_, 0, SEEK_END) < 0) {
+ return E_FILE_READ_ERR;
+ }
+#else
+ if (::lseek(fd_, 0, SEEK_END) < 0) {
+ return E_FILE_READ_ERR;
+ }
+#endif
+ return E_OK;
+}
+
+int64_t WriteFile::get_position() {
+ if (fd_ < 0) {
+ return 0;
+ }
+ // SEEK_CUR with offset 0 returns current position without moving
+#ifdef _WIN32
+ int64_t pos = _lseeki64(fd_, 0, SEEK_CUR);
+ return (pos < 0) ? 0 : pos;
+#else
+ off_t pos = ::lseek(fd_, 0, SEEK_CUR);
+ return (pos < 0) ? 0 : static_cast<int64_t>(pos);
+#endif
+}
+
} // end namespace storage
#ifdef _WIN32
diff --git a/cpp/src/file/write_file.h b/cpp/src/file/write_file.h
index 6b5a506a8..9a5bce6e8 100644
--- a/cpp/src/file/write_file.h
+++ b/cpp/src/file/write_file.h
@@ -36,7 +36,14 @@ class WriteFile {
FORCE_INLINE int get_fd() const { return fd_; }
int sync();
int close();
+ /** Truncate file to the given size (bytes). File must be open. */
+ int truncate(int64_t size);
+ /** Seek to end of file. Used after open to position for append. */
+ int seek_to_end();
FORCE_INLINE std::string get_file_path() { return path_; }
+ /** Current file offset. After seek_to_end(), equals file size (for
+ * recovery). */
+ int64_t get_position();
private:
int do_create(int flags, mode_t mode);
diff --git a/cpp/src/reader/tsfile_reader.cc b/cpp/src/reader/tsfile_reader.cc
index f97570885..84188b6a3 100644
--- a/cpp/src/reader/tsfile_reader.cc
+++ b/cpp/src/reader/tsfile_reader.cc
@@ -29,7 +29,9 @@ namespace storage {
TsFileReader::TsFileReader()
: read_file_(nullptr),
tsfile_executor_(nullptr),
- table_query_executor_(nullptr) {}
+ table_query_executor_(nullptr) {
+ tsfile_reader_meta_pa_.init(512, MOD_TSFILE_READER);
+}
TsFileReader::~TsFileReader() { close(); }
@@ -224,6 +226,10 @@ std::vector<std::shared_ptr<IDeviceID>>
TsFileReader::get_all_device_ids() {
return device_ids;
}
+std::vector<std::shared_ptr<IDeviceID>> TsFileReader::get_all_devices() {
+ return get_all_device_ids();
+}
+
int TsFileReader::get_all_devices(
std::vector<std::shared_ptr<IDeviceID>>& device_ids,
std::shared_ptr<MetaIndexNode> index_node, PageArena& pa) {
@@ -291,6 +297,53 @@ int TsFileReader::get_timeseries_schema(
return E_OK;
}
+int TsFileReader::get_timeseries_metadata_impl(
+ std::shared_ptr<IDeviceID> device_id,
+ std::vector<std::shared_ptr<ITimeseriesIndex>>& result) {
+ int ret = E_OK;
+ std::vector<ITimeseriesIndex*> timeseries_indexs;
+ tsfile_reader_meta_pa_.init(512, MOD_TSFILE_READER);
+ // Pointers are owned by tsfile_reader_meta_pa_; shared_ptr must not delete
+ auto noop_deleter = [](ITimeseriesIndex*) {};
+ if (RET_FAIL(
+ tsfile_executor_->get_tsfile_io_reader()
+ ->get_device_timeseries_meta_without_chunk_meta(
+ device_id, timeseries_indexs, tsfile_reader_meta_pa_))) {
+ } else {
+ for (auto timeseries_index : timeseries_indexs) {
+ result.emplace_back(std::shared_ptr<ITimeseriesIndex>(
+ timeseries_index, noop_deleter));
+ }
+ }
+ return ret;
+}
+
+DeviceTimeseriesMetadataMap TsFileReader::get_timeseries_metadata(
+ const std::vector<std::shared_ptr<IDeviceID>>& device_ids) {
+ DeviceTimeseriesMetadataMap result;
+ for (const auto& device_id : device_ids) {
+ std::vector<std::shared_ptr<ITimeseriesIndex>> list;
+ if (get_timeseries_metadata_impl(device_id, list) == E_OK) {
+ result.insert(std::make_pair(device_id, std::move(list)));
+ }
+ // Skip non-existent devices (not inserted)
+ }
+ return result;
+}
+
+DeviceTimeseriesMetadataMap TsFileReader::get_timeseries_metadata() {
+ // Collect metadata for all devices present in the file
+ DeviceTimeseriesMetadataMap result;
+ auto device_ids = get_all_device_ids();
+ for (const auto& device_id : device_ids) {
+ std::vector<std::shared_ptr<ITimeseriesIndex>> list;
+ if (get_timeseries_metadata_impl(device_id, list) == E_OK) {
+ result.insert(std::make_pair(device_id, std::move(list)));
+ }
+ }
+ return result;
+}
+
ResultSet* TsFileReader::read_timeseries(
const std::shared_ptr<IDeviceID>& device_id,
const std::vector<std::string>& measurement_name) {
diff --git a/cpp/src/reader/tsfile_reader.h b/cpp/src/reader/tsfile_reader.h
index 8a6ba2264..6c8563563 100644
--- a/cpp/src/reader/tsfile_reader.h
+++ b/cpp/src/reader/tsfile_reader.h
@@ -142,6 +142,13 @@ class TsFileReader {
*/
std::vector<std::shared_ptr<IDeviceID>> get_all_device_ids();
+ /**
+ * @brief Get all device IDs in the file (same as get_all_device_ids).
+ *
+ * @return std::vector<std::shared_ptr<IDeviceID>> the device list
+ */
+ std::vector<std::shared_ptr<IDeviceID>> get_all_devices();
+
/**
* @brief get the timeseries schema by the device id and measurement name
*
@@ -152,6 +159,26 @@ class TsFileReader {
*/
int get_timeseries_schema(std::shared_ptr<IDeviceID> device_id,
std::vector<MeasurementSchema>& result);
+
+ /**
+ * @brief Get timeseries metadata for specified devices.
+ *
+ * Only devices that exist in the file are included in the result.
+ * If device_ids is empty, returns an empty map.
+ *
+ * @param device_ids device list to query
+ * @return map: IDeviceID -> list of timeseries metadata (only existing)
+ */
+ DeviceTimeseriesMetadataMap get_timeseries_metadata(
+ const std::vector<std::shared_ptr<IDeviceID>>& device_ids);
+
+ /**
+ * @brief Get timeseries metadata for all devices in the file.
+ *
+ * @return map: IDeviceID -> list of timeseries metadata
+ */
+ DeviceTimeseriesMetadataMap get_timeseries_metadata();
+
/**
* @brief get the table schema by the table name
*
@@ -168,12 +195,16 @@ class TsFileReader {
std::vector<std::shared_ptr<TableSchema>> get_all_table_schemas();
private:
+ int get_timeseries_metadata_impl(
+ std::shared_ptr<IDeviceID> device_id,
+ std::vector<std::shared_ptr<ITimeseriesIndex>>& result);
int get_all_devices(std::vector<std::shared_ptr<IDeviceID>>& device_ids,
std::shared_ptr<MetaIndexNode> index_node,
common::PageArena& pa);
storage::ReadFile* read_file_;
storage::TsFileExecutor* tsfile_executor_;
storage::TableQueryExecutor* table_query_executor_;
+ common::PageArena tsfile_reader_meta_pa_;
};
} // namespace storage
diff --git a/cpp/src/reader/tsfile_tree_reader.cc
b/cpp/src/reader/tsfile_tree_reader.cc
index 2b28c8647..1b58c359d 100644
--- a/cpp/src/reader/tsfile_tree_reader.cc
+++ b/cpp/src/reader/tsfile_tree_reader.cc
@@ -68,4 +68,17 @@ std::vector<std::string>
TsFileTreeReader::get_all_device_ids() {
return ret_device_ids;
}
-} // namespace storage
\ No newline at end of file
+std::vector<std::shared_ptr<IDeviceID>> TsFileTreeReader::get_all_devices() {
+ return tsfile_reader_->get_all_devices();
+}
+
+DeviceTimeseriesMetadataMap TsFileTreeReader::get_timeseries_metadata(
+ const std::vector<std::shared_ptr<IDeviceID>>& device_ids) {
+ return tsfile_reader_->get_timeseries_metadata(device_ids);
+}
+
+DeviceTimeseriesMetadataMap TsFileTreeReader::get_timeseries_metadata() {
+ return tsfile_reader_->get_timeseries_metadata();
+}
+
+} // namespace storage
diff --git a/cpp/src/reader/tsfile_tree_reader.h
b/cpp/src/reader/tsfile_tree_reader.h
index 66341b7ed..535180409 100644
--- a/cpp/src/reader/tsfile_tree_reader.h
+++ b/cpp/src/reader/tsfile_tree_reader.h
@@ -89,14 +89,37 @@ class TsFileTreeReader {
const std::string& device_id);
/**
- * @brief Get all device identifiers in the TsFile
+ * @brief Get all device identifiers in the TsFile (string form).
*
- * @return Vector containing all device identifiers found in the TsFile
- * @note The returned vector will be empty if no devices are found or file
- * is not opened
+ * @return Vector of device identifier strings
*/
std::vector<std::string> get_all_device_ids();
+ /**
+ * @brief Get all devices in the file (IDeviceID form).
+ *
+ * @return Vector of IDeviceID for all devices
+ */
+ std::vector<std::shared_ptr<IDeviceID>> get_all_devices();
+
+ /**
+ * @brief Get timeseries metadata for specified devices.
+ *
+ * Only devices that exist in the file are included.
+ *
+ * @param device_ids device list to query
+ * @return map: IDeviceID -> list of timeseries metadata (only existing)
+ */
+ DeviceTimeseriesMetadataMap get_timeseries_metadata(
+ const std::vector<std::shared_ptr<IDeviceID>>& device_ids);
+
+ /**
+ * @brief Get timeseries metadata for all devices in the file.
+ *
+ * @return map: IDeviceID -> list of timeseries metadata
+ */
+ DeviceTimeseriesMetadataMap get_timeseries_metadata();
+
private:
std::shared_ptr<TsFileReader>
tsfile_reader_; ///< Underlying TsFile reader implementation
diff --git a/cpp/src/writer/tsfile_table_writer.cc
b/cpp/src/writer/tsfile_table_writer.cc
index 6dd990188..eb0319af8 100644
--- a/cpp/src/writer/tsfile_table_writer.cc
+++ b/cpp/src/writer/tsfile_table_writer.cc
@@ -19,6 +19,32 @@
#include "tsfile_table_writer.h"
+#include "file/restorable_tsfile_io_writer.h"
+
+namespace storage {
+
+// Constructor for appending after recovery: schema comes from restored file.
+TsFileTableWriter::TsFileTableWriter(
+ storage::RestorableTsFileIOWriter* restorable_writer,
+ uint64_t memory_threshold)
+ : error_number(common::E_OK) {
+ tsfile_writer_ = std::make_shared<TsFileWriter>();
+ error_number = tsfile_writer_->init(restorable_writer);
+ if (error_number != common::E_OK) {
+ return;
+ }
+ tsfile_writer_->set_generate_table_schema(false);
+ std::shared_ptr<Schema> schema = restorable_writer->get_known_schema();
+ if (schema && schema->table_schema_map_.size() == 1) {
+ exclusive_table_name_ = schema->table_schema_map_.begin()->first;
+ } else {
+ exclusive_table_name_.clear();
+ }
+ common::g_config_value_.chunk_group_size_threshold_ = memory_threshold;
+}
+
+} // namespace storage
+
storage::TsFileTableWriter::~TsFileTableWriter() = default;
int storage::TsFileTableWriter::register_table(
diff --git a/cpp/src/writer/tsfile_table_writer.h
b/cpp/src/writer/tsfile_table_writer.h
index d3fc918b7..ce18bc007 100644
--- a/cpp/src/writer/tsfile_table_writer.h
+++ b/cpp/src/writer/tsfile_table_writer.h
@@ -22,6 +22,7 @@
#include "writer/tsfile_writer.h"
namespace storage {
+class RestorableTsFileIOWriter;
/**
* @brief Facilitates writing structured table data into a TsFile with a
@@ -66,6 +67,19 @@ class TsFileTableWriter {
common::g_config_value_.chunk_group_size_threshold_ = memory_threshold;
}
+ /**
+ * Constructs a TsFileTableWriter from a RestorableTsFileIOWriter so that
+ * table data can be appended after recovery. Schema is taken from the
+ * restored file; do not pass a TableSchema.
+ *
+ * @param restorable_writer Restored I/O writer; must not be null and must
+ * have been opened with truncate so that can_write() is true.
+ * @param memory_threshold Optional memory threshold for buffered data.
+ */
+ explicit TsFileTableWriter(
+ storage::RestorableTsFileIOWriter* restorable_writer,
+ uint64_t memory_threshold = 128 * 1024 * 1024);
+
~TsFileTableWriter();
/**
* Registers a table schema with the writer.
diff --git a/cpp/src/writer/tsfile_tree_writer.cc
b/cpp/src/writer/tsfile_tree_writer.cc
index 59c11914d..28913c2bd 100644
--- a/cpp/src/writer/tsfile_tree_writer.cc
+++ b/cpp/src/writer/tsfile_tree_writer.cc
@@ -19,6 +19,8 @@
#include "writer/tsfile_tree_writer.h"
+#include "file/restorable_tsfile_io_writer.h"
+
namespace storage {
TsFileTreeWriter::TsFileTreeWriter(storage::WriteFile* writer_file,
@@ -28,6 +30,16 @@ TsFileTreeWriter::TsFileTreeWriter(storage::WriteFile*
writer_file,
common::g_config_value_.chunk_group_size_threshold_ = memory_threshold;
}
+// Constructor for appending after recovery: schema and alignment from restored
+// file.
+TsFileTreeWriter::TsFileTreeWriter(
+ storage::RestorableTsFileIOWriter* restorable_writer,
+ uint64_t memory_threshold) {
+ tsfile_writer_ = std::make_shared<TsFileWriter>();
+ tsfile_writer_->init(restorable_writer);
+ common::g_config_value_.chunk_group_size_threshold_ = memory_threshold;
+}
+
int TsFileTreeWriter::register_timeseries(std::string& device_id,
MeasurementSchema* schema) {
return tsfile_writer_->register_timeseries(device_id, *schema);
diff --git a/cpp/src/writer/tsfile_tree_writer.h
b/cpp/src/writer/tsfile_tree_writer.h
index 90ef0d76f..3c21c23da 100644
--- a/cpp/src/writer/tsfile_tree_writer.h
+++ b/cpp/src/writer/tsfile_tree_writer.h
@@ -26,6 +26,7 @@
#include "tsfile_writer.h"
namespace storage {
+class RestorableTsFileIOWriter;
/**
* @brief Provides an interface for writing hierarchical (tree-structured)
@@ -56,6 +57,19 @@ class TsFileTreeWriter {
explicit TsFileTreeWriter(storage::WriteFile* writer_file,
uint64_t memory_threshold = 128 * 1024 * 1024);
+ /**
+ * Constructs a TsFileTreeWriter from a RestorableTsFileIOWriter so that
+ * data can be appended after recovery (schema and alignment are taken from
+ * the restored file).
+ *
+ * @param restorable_writer Restored I/O writer; must not be null and must
+ * have been opened and scanned (e.g. after truncate recovery).
+ * @param memory_threshold Optional memory threshold for buffered data.
+ */
+ explicit TsFileTreeWriter(
+ storage::RestorableTsFileIOWriter* restorable_writer,
+ uint64_t memory_threshold = 128 * 1024 * 1024);
+
/**
* Registers a single (non-aligned) time series under the given device ID.
*
diff --git a/cpp/src/writer/tsfile_writer.cc b/cpp/src/writer/tsfile_writer.cc
index 2c2e46b97..9a087a82f 100644
--- a/cpp/src/writer/tsfile_writer.cc
+++ b/cpp/src/writer/tsfile_writer.cc
@@ -23,6 +23,7 @@
#include "chunk_writer.h"
#include "common/config/config.h"
+#include "file/restorable_tsfile_io_writer.h"
#include "file/tsfile_io_writer.h"
#include "file/write_file.h"
#include "utils/errno_define.h"
@@ -68,7 +69,8 @@ TsFileWriter::TsFileWriter()
record_count_since_last_flush_(0),
record_count_for_next_mem_check_(
g_config_value_.record_count_for_next_mem_check_),
- write_file_created_(false) {}
+ write_file_created_(false),
+ io_writer_owned_(true) {}
TsFileWriter::~TsFileWriter() { destroy(); }
@@ -77,10 +79,10 @@ void TsFileWriter::destroy() {
delete write_file_;
write_file_ = nullptr;
}
- if (io_writer_) {
+ if (io_writer_owned_ && io_writer_) {
delete io_writer_;
- io_writer_ = nullptr;
}
+ io_writer_ = nullptr;
DeviceSchemasMapIter dev_iter;
// cppcheck-suppress postfixOperator
for (dev_iter = schemas_.begin(); dev_iter != schemas_.end(); dev_iter++) {
@@ -113,11 +115,80 @@ int TsFileWriter::init(WriteFile* write_file) {
}
write_file_ = write_file;
write_file_created_ = false;
+ io_writer_owned_ = true;
io_writer_ = new TsFileIOWriter();
io_writer_->init(write_file_);
return E_OK;
}
+//
-----------------------------------------------------------------------------
+// Recovery init: rebuild schemas_ from recovered chunk group metas (aligned
+// with Java). Use each CGM's actual device_id from file as key so tree and
+// table model both get correct lookups. Table model can still lazy-create from
+// table_schema_map_ in do_check_schema_table when a new device appears.
+// All new MeasurementSchemaGroup/MeasurementSchema are freed in destroy().
+//
-----------------------------------------------------------------------------
+int TsFileWriter::init(RestorableTsFileIOWriter* rw) {
+ if (rw == nullptr || !rw->can_write()) {
+ return E_INVALID_ARG;
+ }
+ write_file_ = rw->get_write_file();
+ write_file_created_ = false;
+ io_writer_owned_ = false;
+ io_writer_ = rw;
+
+ const std::vector<ChunkGroupMeta*>& recovered =
+ rw->get_recovered_chunk_group_metas();
+ for (ChunkGroupMeta* cgm : recovered) {
+ if (cgm == nullptr || cgm->device_id_ == nullptr) {
+ continue;
+ }
+ std::shared_ptr<IDeviceID> device_id = cgm->device_id_;
+
+ // Find existing group for same device (same device may have multiple
+ // CGMs from multiple flushes).
+ DeviceSchemasMapIter it = schemas_.begin();
+ for (; it != schemas_.end(); ++it) {
+ if (it->first != nullptr && *it->first == *device_id) {
+ break;
+ }
+ }
+
+ MeasurementSchemaGroup* group = nullptr;
+ if (it != schemas_.end()) {
+ group = it->second;
+ } else {
+ group = new MeasurementSchemaGroup;
+ group->is_aligned_ =
+ rw->is_device_aligned(device_id->get_table_name());
+ schemas_.insert(std::make_pair(device_id, group));
+ }
+
+ // Add measurement schemas from this CGM (skip time column: empty
name).
+ for (auto iter = cgm->chunk_meta_list_.begin();
+ iter != cgm->chunk_meta_list_.end(); iter++) {
+ ChunkMeta* cm = iter.get();
+ if (cm == nullptr) {
+ continue;
+ }
+ std::string mname = cm->measurement_name_.to_std_string();
+ if (mname.empty()) {
+ continue;
+ }
+ if (group->measurement_schema_map_.find(mname) !=
+ group->measurement_schema_map_.end()) {
+ continue;
+ }
+ MeasurementSchema* ms = new MeasurementSchema(
+ mname, cm->data_type_, cm->encoding_, cm->compression_type_);
+ group->measurement_schema_map_.insert(std::make_pair(mname, ms));
+ }
+ }
+
+ start_file_done_ = true;
+ return E_OK;
+}
+
void TsFileWriter::set_generate_table_schema(bool generate_table_schema) {
io_writer_->set_generate_table_schema(generate_table_schema);
}
@@ -495,6 +566,15 @@ int TsFileWriter::do_check_schema_table(
schemas_[device_id] = device_schema;
}
+ // After recovery, device_schema may exist but time_chunk_writer_ not yet
+ // created
+ if (IS_NULL(device_schema->time_chunk_writer_)) {
+ device_schema->time_chunk_writer_ = new TimeChunkWriter();
+ device_schema->time_chunk_writer_->init(
+ "", g_config_value_.time_encoding_type_,
+ g_config_value_.time_compress_type_);
+ }
+
uint32_t column_cnt = tablet.get_column_count();
time_chunk_writer = device_schema->time_chunk_writer_;
MeasurementSchemaMap& msm = device_schema->measurement_schema_map_;
diff --git a/cpp/src/writer/tsfile_writer.h b/cpp/src/writer/tsfile_writer.h
index e80a1232b..85c47db7f 100644
--- a/cpp/src/writer/tsfile_writer.h
+++ b/cpp/src/writer/tsfile_writer.h
@@ -37,6 +37,7 @@ namespace storage {
class WriteFile;
class ChunkWriter;
class TsFileIOWriter;
+class RestorableTsFileIOWriter;
} // namespace storage
namespace storage {
@@ -55,6 +56,7 @@ class TsFileWriter {
int open(const std::string& file_path, int flags, mode_t mode);
int open(const std::string& file_path);
int init(storage::WriteFile* write_file);
+ int init(storage::RestorableTsFileIOWriter* rw);
void set_generate_table_schema(bool generate_table_schema);
int register_timeseries(const std::string& device_id,
@@ -183,6 +185,7 @@ class TsFileWriter {
// record count for next memory check
int64_t record_count_for_next_mem_check_;
bool write_file_created_;
+ bool io_writer_owned_; // false when init(RestorableTsFileIOWriter*)
bool table_aligned_ = true;
int write_typed_column(ValueChunkWriter* value_chunk_writer,
diff --git a/cpp/test/CMakeLists.txt b/cpp/test/CMakeLists.txt
index 423381e4f..2be9c1b2c 100644
--- a/cpp/test/CMakeLists.txt
+++ b/cpp/test/CMakeLists.txt
@@ -154,10 +154,13 @@ target_link_libraries(
set_target_properties(TsFile_Test PROPERTIES RUNTIME_OUTPUT_DIRECTORY
${LIB_TSFILE_SDK_DIR})
+# On Windows, copy tsfile DLL next to the test exe so it can load at runtime
+# (and when gtest_discover_tests runs the exe). Use TARGET_FILE so the path
+# is correct for the current build config (e.g. Release).
if (WIN32)
add_custom_command(TARGET TsFile_Test POST_BUILD
- COMMAND ${CMAKE_COMMAND} -E copy
- "${LIBRARY_OUTPUT_PATH}/libtsfile.dll"
+ COMMAND ${CMAKE_COMMAND} -E copy_if_different
+ $<TARGET_FILE:tsfile>
"$<TARGET_FILE_DIR:TsFile_Test>"
COMMENT "Copying libtsfile.dll to test executable directory"
VERBATIM
@@ -165,4 +168,11 @@ if (WIN32)
endif ()
include(GoogleTest)
-gtest_discover_tests(TsFile_Test)
\ No newline at end of file
+# On Windows, delay test discovery until ctest runs (PRE_TEST) so the test exe
+# runs with the correct env (e.g. PATH has MinGW, libtsfile.dll is present).
+# Avoids 0xc0000139 when discovery runs at build time.
+if(WIN32)
+ gtest_discover_tests(TsFile_Test DISCOVERY_MODE PRE_TEST)
+else()
+ gtest_discover_tests(TsFile_Test)
+endif()
\ No newline at end of file
diff --git a/cpp/test/file/restorable_tsfile_io_writer_test.cc
b/cpp/test/file/restorable_tsfile_io_writer_test.cc
new file mode 100644
index 000000000..655995d35
--- /dev/null
+++ b/cpp/test/file/restorable_tsfile_io_writer_test.cc
@@ -0,0 +1,497 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Unit tests for RestorableTsFileIOWriter.
+ * Covers: empty/invalid/complete file open, truncate recovery, continued write
+ * with TsFileWriter/TsFileTreeWriter/TsFileTableWriter, and read-back verify.
+ */
+
+#include "file/restorable_tsfile_io_writer.h"
+
+#include <gtest/gtest.h>
+
+#include <fstream>
+#include <random>
+
+#include "common/record.h"
+#include "common/schema.h"
+#include "common/tablet.h"
+#include "common/tsfile_common.h"
+#include "file/write_file.h"
+#include "reader/tsfile_reader.h"
+#include "reader/tsfile_tree_reader.h"
+#include "writer/tsfile_table_writer.h"
+#include "writer/tsfile_tree_writer.h"
+#include "writer/tsfile_writer.h"
+
+namespace storage {
+class ResultSet;
+}
+using namespace storage;
+using namespace common;
+
+//
-----------------------------------------------------------------------------
+// Helpers used by multiple tests (file flags, file size, corrupt tail)
+//
-----------------------------------------------------------------------------
+
+static int GetWriteCreateFlags() {
+ int flags = O_WRONLY | O_CREAT | O_TRUNC;
+#ifdef _WIN32
+ flags |= O_BINARY;
+#endif
+ return flags;
+}
+
+static int64_t GetFileSize(const std::string& path) {
+ std::ifstream f(path, std::ios::binary | std::ios::ate);
+ return static_cast<int64_t>(f.tellg());
+}
+
+/** Overwrite the last num_bytes of the file with zeros to simulate corruption.
+ */
+static void CorruptFileTail(const std::string& path, int num_bytes) {
+ const int64_t full_size = GetFileSize(path);
+ std::ofstream out(path, std::ios::binary | std::ios::in);
+ out.seekp(full_size - static_cast<std::streamoff>(num_bytes));
+ for (int i = 0; i < num_bytes; ++i) {
+ out.put(0);
+ }
+ out.close();
+}
+
+/** Query tree reader and return row count; destroys query result. */
+static int CountTreeReaderRows(
+ TsFileTreeReader& reader, const std::vector<std::string>& measurement_ids)
{
+ auto device_ids = reader.get_all_device_ids();
+ ResultSet* result = nullptr;
+ int ret =
+ reader.query(device_ids, measurement_ids, INT64_MIN, INT64_MAX,
result);
+ if (ret != E_OK || result == nullptr) {
+ return -1;
+ }
+ int count = 0;
+ for (auto it = result->iterator(); it.hasNext(); it.next()) {
+ ++count;
+ }
+ reader.destroy_query_data_set(result);
+ return count;
+}
+
+//
-----------------------------------------------------------------------------
+// Test fixture
+//
-----------------------------------------------------------------------------
+
+class RestorableTsFileIOWriterTest : public ::testing::Test {
+ protected:
+ void SetUp() override {
+ libtsfile_init();
+ file_name_ = std::string("restorable_tsfile_io_writer_test_") +
+ generate_random_string(10) + std::string(".tsfile");
+ remove(file_name_.c_str());
+ }
+
+ void TearDown() override {
+ remove(file_name_.c_str());
+ libtsfile_destroy();
+ }
+
+ int64_t GetCurrentFileSize() const { return GetFileSize(file_name_); }
+ void CorruptCurrentFileTail(int num_bytes) {
+ CorruptFileTail(file_name_, num_bytes);
+ }
+
+ std::string file_name_;
+
+ static std::string generate_random_string(int length) {
+ std::random_device rd;
+ std::mt19937 gen(rd());
+ std::uniform_int_distribution<> dis(0, 61);
+ const std::string chars =
+ "0123456789"
+ "abcdefghijklmnopqrstuvwxyz"
+ "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
+ std::string s;
+ s.reserve(static_cast<size_t>(length));
+ for (int i = 0; i < length; ++i) {
+ s += chars[static_cast<size_t>(dis(gen))];
+ }
+ return s;
+ }
+};
+
+//
-----------------------------------------------------------------------------
+// Open behavior: empty file, bad magic, complete file, truncated file,
+// header-only
+//
-----------------------------------------------------------------------------
+
+TEST_F(RestorableTsFileIOWriterTest, OpenEmptyFile) {
+ RestorableTsFileIOWriter writer;
+ ASSERT_EQ(writer.open(file_name_, true), E_OK);
+ EXPECT_TRUE(writer.can_write());
+ EXPECT_TRUE(writer.has_crashed());
+ EXPECT_EQ(writer.get_truncated_size(), 0);
+ EXPECT_NE(writer.get_tsfile_io_writer(), nullptr);
+ writer.close();
+}
+
+TEST_F(RestorableTsFileIOWriterTest, OpenBadMagicFile) {
+ std::ofstream f(file_name_);
+ f.write("BadFile", 7);
+ f.close();
+
+ RestorableTsFileIOWriter writer;
+ EXPECT_NE(writer.open(file_name_, true), E_OK);
+ EXPECT_EQ(writer.get_truncated_size(), TSFILE_CHECK_INCOMPATIBLE);
+ writer.close();
+}
+
+TEST_F(RestorableTsFileIOWriterTest, OpenCompleteFile) {
+ TsFileWriter tw;
+ ASSERT_EQ(tw.open(file_name_, GetWriteCreateFlags(), 0666), E_OK);
+ tw.register_timeseries(
+ "d1",
+ MeasurementSchema("s1", FLOAT, GORILLA,
CompressionType::UNCOMPRESSED));
+ TsRecord record(1, "d1");
+ record.add_point("s1", 1.0f);
+ tw.write_record(record);
+ record.timestamp_ = 2;
+ tw.write_record(record);
+ tw.flush();
+ tw.close();
+
+ RestorableTsFileIOWriter writer;
+ ASSERT_EQ(writer.open(file_name_, true), E_OK);
+ EXPECT_FALSE(writer.can_write());
+ EXPECT_FALSE(writer.has_crashed());
+ EXPECT_EQ(writer.get_truncated_size(), TSFILE_CHECK_COMPLETE);
+ EXPECT_EQ(writer.get_tsfile_io_writer(), nullptr);
+ writer.close();
+}
+
+TEST_F(RestorableTsFileIOWriterTest, OpenTruncatedFile) {
+ TsFileWriter tw;
+ ASSERT_EQ(tw.open(file_name_, GetWriteCreateFlags(), 0666), E_OK);
+ tw.register_timeseries(
+ "d1",
+ MeasurementSchema("s1", FLOAT, RLE, CompressionType::UNCOMPRESSED));
+ TsRecord record(1, "d1");
+ record.add_point("s1", 1.0f);
+ tw.write_record(record);
+ tw.flush();
+ tw.close();
+
+ const int64_t full_size = GetCurrentFileSize();
+ CorruptCurrentFileTail(5);
+
+ RestorableTsFileIOWriter writer;
+ ASSERT_EQ(writer.open(file_name_, true), E_OK);
+ EXPECT_TRUE(writer.can_write());
+ EXPECT_TRUE(writer.has_crashed());
+ EXPECT_GE(writer.get_truncated_size(),
+ static_cast<int64_t>(MAGIC_STRING_TSFILE_LEN + 1));
+ EXPECT_LE(writer.get_truncated_size(), full_size);
+ EXPECT_NE(writer.get_tsfile_io_writer(), nullptr);
+ writer.close();
+}
+
+TEST_F(RestorableTsFileIOWriterTest, OpenFileWithOnlyHeader) {
+ int flags = O_RDWR | O_CREAT | O_TRUNC;
+#ifdef _WIN32
+ flags |= O_BINARY;
+#endif
+ WriteFile wf;
+ ASSERT_EQ(wf.create(file_name_, flags, 0666), E_OK);
+ wf.write(MAGIC_STRING_TSFILE, MAGIC_STRING_TSFILE_LEN);
+ wf.write(&VERSION_NUM_BYTE, 1);
+ wf.close();
+
+ RestorableTsFileIOWriter writer;
+ ASSERT_EQ(writer.open(file_name_, true), E_OK);
+ EXPECT_TRUE(writer.can_write());
+ EXPECT_TRUE(writer.has_crashed());
+ EXPECT_EQ(writer.get_truncated_size(), MAGIC_STRING_TSFILE_LEN + 1);
+ writer.close();
+}
+
+//
-----------------------------------------------------------------------------
+// Recovery + continued write: TsFileWriter::init(rw) rebuilds schemas_ from
+// recovered chunk group metas using actual device_id from file (not
+// table_name), so both tree and table model get correct lookups.
+//
-----------------------------------------------------------------------------
+
+TEST_F(RestorableTsFileIOWriterTest, TruncateRecoversAndProvidesWriter) {
+ TsFileWriter tw;
+ ASSERT_EQ(tw.open(file_name_, GetWriteCreateFlags(), 0666), E_OK);
+ tw.register_timeseries(
+ "d1",
+ MeasurementSchema("s1", FLOAT, GORILLA,
CompressionType::UNCOMPRESSED));
+ TsRecord record(1, "d1");
+ record.add_point("s1", 1.0f);
+ tw.write_record(record);
+ record.timestamp_ = 2;
+ tw.write_record(record);
+ tw.flush();
+ tw.close();
+
+ CorruptCurrentFileTail(3);
+
+ RestorableTsFileIOWriter rw;
+ ASSERT_EQ(rw.open(file_name_, true), E_OK);
+ ASSERT_TRUE(rw.can_write());
+ ASSERT_NE(rw.get_tsfile_io_writer(), nullptr);
+ ASSERT_NE(rw.get_write_file(), nullptr);
+ EXPECT_EQ(rw.get_file_path(), file_name_);
+
+ TsFileWriter tw2;
+ ASSERT_EQ(tw2.init(&rw), E_OK);
+ TsRecord record2(3, "d1");
+ record2.add_point("s1", 3.0f);
+ ASSERT_EQ(tw2.write_record(record2), E_OK);
+ tw2.close();
+ rw.close();
+}
+
+// Multi-segment device path: recovery must use actual device_id from file so
+// that subsequent write to the same path finds the schema (no table_name key).
+TEST_F(RestorableTsFileIOWriterTest,
+ TreeModelMultiSegmentDeviceRecoverAndWrite) {
+ TsFileWriter tw;
+ ASSERT_EQ(tw.open(file_name_, GetWriteCreateFlags(), 0666), E_OK);
+ tw.register_timeseries(
+ "root.d1",
+ MeasurementSchema("s1", FLOAT, GORILLA,
CompressionType::UNCOMPRESSED));
+ TsRecord record(1, "root.d1");
+ record.add_point("s1", 1.0f);
+ ASSERT_EQ(tw.write_record(record), E_OK);
+ tw.flush();
+ tw.close();
+
+ CorruptCurrentFileTail(3);
+
+ RestorableTsFileIOWriter rw;
+ ASSERT_EQ(rw.open(file_name_, true), E_OK);
+ ASSERT_TRUE(rw.can_write());
+
+ TsFileWriter tw2;
+ ASSERT_EQ(tw2.init(&rw), E_OK);
+ TsRecord record2(2, "root.d1");
+ record2.add_point("s1", 2.0f);
+ ASSERT_EQ(tw2.write_record(record2), E_OK);
+ tw2.flush();
+ tw2.close();
+ rw.close();
+
+ TsFileTreeReader reader;
+ reader.open(file_name_);
+ ASSERT_EQ(reader.get_all_device_ids().size(), 1u);
+ ASSERT_EQ(CountTreeReaderRows(reader, {"s1"}), 2);
+ reader.close();
+}
+
+//
-----------------------------------------------------------------------------
+// Recovery + continued write with TsFileTreeWriter, then read-back verify
+//
-----------------------------------------------------------------------------
+
+TEST_F(RestorableTsFileIOWriterTest, MultiDeviceRecoverAndWriteWithTreeWriter)
{
+ TsFileWriter tw;
+ ASSERT_EQ(tw.open(file_name_, GetWriteCreateFlags(), 0666), E_OK);
+ tw.register_timeseries("d1", MeasurementSchema("s1", FLOAT));
+ tw.register_timeseries("d1", MeasurementSchema("s2", INT32));
+ tw.register_timeseries("d2", MeasurementSchema("s1", FLOAT));
+ tw.register_timeseries("d2", MeasurementSchema("s2", DOUBLE));
+
+ TsRecord r1(1, "d1");
+ r1.add_point("s1", 1.0f);
+ r1.add_point("s2", 10);
+ ASSERT_EQ(tw.write_record(r1), E_OK);
+ TsRecord r2(2, "d2");
+ r2.add_point("s1", 2.0f);
+ r2.add_point("s2", 20.0);
+ ASSERT_EQ(tw.write_record(r2), E_OK);
+ tw.flush();
+ tw.close();
+
+ CorruptCurrentFileTail(3);
+
+ RestorableTsFileIOWriter rw;
+ ASSERT_EQ(rw.open(file_name_, true), E_OK);
+ ASSERT_TRUE(rw.can_write());
+
+ TsFileTreeWriter tree_writer(&rw);
+ TsRecord r3(3, "d1");
+ r3.add_point("s1", 3.0f);
+ r3.add_point("s2", 30);
+ ASSERT_EQ(tree_writer.write(r3), E_OK);
+ TsRecord r4(4, "d2");
+ r4.add_point("s1", 4.0f);
+ r4.add_point("s2", 40.0);
+ ASSERT_EQ(tree_writer.write(r4), E_OK);
+ tree_writer.flush();
+ tree_writer.close();
+
+ TsFileTreeReader reader;
+ reader.open(file_name_);
+ ASSERT_EQ(reader.get_all_device_ids().size(), 2u);
+ ASSERT_EQ(CountTreeReaderRows(reader, {"s1", "s2"}), 4);
+ reader.close();
+}
+
+//
-----------------------------------------------------------------------------
+// Tree model + Recovery + continued write with aligned timeseries, then
+// read-back verify
+//
-----------------------------------------------------------------------------
+
+TEST_F(RestorableTsFileIOWriterTest, AlignedTimeseriesRecoverAndWrite) {
+ TsFileWriter tw;
+ ASSERT_EQ(tw.open(file_name_, GetWriteCreateFlags(), 0666), E_OK);
+ std::vector<MeasurementSchema*> aligned_schemas;
+ aligned_schemas.push_back(new MeasurementSchema("s1", FLOAT));
+ aligned_schemas.push_back(new MeasurementSchema("s2", FLOAT));
+ tw.register_aligned_timeseries("d1", aligned_schemas);
+
+ TsRecord r1(1, "d1");
+ r1.add_point("s1", 1.0f);
+ r1.add_point("s2", 2.0f);
+ ASSERT_EQ(tw.write_record_aligned(r1), E_OK);
+ TsRecord r2(2, "d1");
+ r2.add_point("s1", 3.0f);
+ r2.add_point("s2", 4.0f);
+ ASSERT_EQ(tw.write_record_aligned(r2), E_OK);
+ tw.flush();
+ tw.close();
+
+ CorruptCurrentFileTail(3);
+
+ RestorableTsFileIOWriter rw;
+ ASSERT_EQ(rw.open(file_name_, true), E_OK);
+ ASSERT_TRUE(rw.can_write());
+
+ TsFileTreeWriter tw2(&rw);
+ TsRecord r3(3, "d1");
+ r3.add_point("s1", 5.0f);
+ r3.add_point("s2", 6.0f);
+ ASSERT_EQ(tw2.write(r3), E_OK);
+ tw2.flush();
+ tw2.close();
+
+ TsFileTreeReader reader;
+ reader.open(file_name_);
+ ASSERT_EQ(reader.get_all_device_ids().size(), 1u);
+ ASSERT_EQ(CountTreeReaderRows(reader, {"s1", "s2"}), 3);
+ reader.close();
+}
+
+//
-----------------------------------------------------------------------------
+// Recovery + continued write with TsFileTableWriter (table model), then
+// read-back
+//
-----------------------------------------------------------------------------
+
+TEST_F(RestorableTsFileIOWriterTest, TableWriterRecoverAndWrite) {
+ std::vector<MeasurementSchema*> measurement_schemas;
+ measurement_schemas.push_back(new MeasurementSchema("device", STRING));
+ measurement_schemas.push_back(new MeasurementSchema("value", DOUBLE));
+ std::vector<ColumnCategory> column_categories = {ColumnCategory::TAG,
+ ColumnCategory::FIELD};
+ TableSchema table_schema("test_table", measurement_schemas,
+ column_categories);
+
+ WriteFile write_file;
+ write_file.create(file_name_, GetWriteCreateFlags(), 0666);
+ TsFileTableWriter table_writer(&write_file, &table_schema);
+ const std::string table_name = "test_table";
+
+ {
+ Tablet tablet(table_schema.get_measurement_names(),
+ table_schema.get_data_types(), 10);
+ tablet.set_table_name(table_name);
+ for (int i = 0; i < 10; i++) {
+ tablet.add_timestamp(i, static_cast<int64_t>(i));
+ tablet.add_value(i, "device", "device0");
+ tablet.add_value(i, "value", i * 1.1);
+ }
+ ASSERT_EQ(table_writer.write_table(tablet), E_OK);
+ ASSERT_EQ(table_writer.flush(), E_OK);
+ }
+ {
+ Tablet tablet(table_schema.get_measurement_names(),
+ table_schema.get_data_types(), 10);
+ tablet.set_table_name(table_name);
+ for (int i = 0; i < 10; i++) {
+ tablet.add_timestamp(i, static_cast<int64_t>(i + 10));
+ tablet.add_value(i, "device", "device1");
+ tablet.add_value(i, "value", i * 1.1);
+ }
+ ASSERT_EQ(table_writer.write_table(tablet), E_OK);
+ ASSERT_EQ(table_writer.flush(), E_OK);
+ }
+
+ table_writer.close();
+ write_file.close();
+
+ CorruptCurrentFileTail(3);
+
+ RestorableTsFileIOWriter rw;
+ ASSERT_EQ(rw.open(file_name_, true), E_OK);
+ ASSERT_TRUE(rw.can_write());
+
+ TsFileTableWriter table_writer2(&rw);
+ std::vector<std::string> value_col = {"__level1", "value"};
+ std::vector<TSDataType> value_types = {STRING, DOUBLE};
+ {
+ Tablet tablet2(value_col, value_types, 10);
+ tablet2.set_table_name(table_name);
+ for (int i = 0; i < 10; i++) {
+ tablet2.add_timestamp(i, static_cast<int64_t>(i + 20));
+ tablet2.add_value(i, "__level1", "device0");
+ tablet2.add_value(i, "value", (i + 10) * 1.1);
+ }
+ ASSERT_EQ(table_writer2.write_table(tablet2), E_OK);
+ table_writer2.flush();
+ }
+ {
+ Tablet tablet2(value_col, value_types, 10);
+ tablet2.set_table_name(table_name);
+ for (int i = 0; i < 10; i++) {
+ tablet2.add_timestamp(i, static_cast<int64_t>(i + 30));
+ tablet2.add_value(i, "__level1", "device1");
+ tablet2.add_value(i, "value", (i + 10) * 1.1);
+ }
+ ASSERT_EQ(table_writer2.write_table(tablet2), E_OK);
+ table_writer2.flush();
+ }
+
+ table_writer2.close();
+
+ TsFileReader table_reader;
+ ASSERT_EQ(table_reader.open(file_name_), E_OK);
+ ResultSet* tmp_result_set = nullptr;
+ table_reader.query("test_table", {"__level1", "value"}, 0, 10000,
+ tmp_result_set, nullptr);
+ auto* table_result_set = static_cast<TableResultSet*>(tmp_result_set);
+ bool has_next = false;
+ int64_t row_num = 0;
+ while (IS_SUCC(table_result_set->next(has_next)) && has_next) {
+ (void)table_result_set->get_row_record();
+ row_num++;
+ }
+ ASSERT_EQ(row_num, 40);
+ table_reader.destroy_query_data_set(tmp_result_set);
+ table_reader.close();
+}
diff --git a/cpp/test/file/write_file_test.cc b/cpp/test/file/write_file_test.cc
index 1345b7bee..3cb9edd25 100644
--- a/cpp/test/file/write_file_test.cc
+++ b/cpp/test/file/write_file_test.cc
@@ -112,3 +112,32 @@ TEST_F(WriteFileTest, CloseFile) {
EXPECT_EQ(write_file.write(content, content_len), E_OK);
EXPECT_EQ(write_file.close(), E_OK);
}
+
+// Truncate file to a given size (used by RestorableTsFileIOWriter after
+// recovery).
+TEST_F(WriteFileTest, TruncateFile) {
+ WriteFile write_file;
+ std::string file_name = "test_file_truncate.dat";
+
+ remove(file_name.c_str());
+
+ int flags = O_RDWR | O_CREAT | O_TRUNC;
+#ifdef _WIN32
+ flags |= O_BINARY;
+#endif
+ mode_t mode = 0666;
+ EXPECT_EQ(write_file.create(file_name, flags, mode), E_OK);
+ EXPECT_TRUE(write_file.file_opened());
+
+ const char* content = "Hello, Truncate World!";
+ uint32_t content_len = strlen(content);
+ EXPECT_EQ(write_file.write(content, content_len), E_OK);
+ EXPECT_EQ(write_file.truncate(7), E_OK);
+ write_file.close();
+
+ std::ifstream file(file_name);
+ std::string file_content((std::istreambuf_iterator<char>(file)),
+ std::istreambuf_iterator<char>());
+ EXPECT_EQ(file_content, "Hello, ");
+ remove(file_name.c_str());
+}
diff --git a/cpp/test/reader/tree_view/tsfile_reader_tree_test.cc
b/cpp/test/reader/tree_view/tsfile_reader_tree_test.cc
index ffcaa20fa..aa4ff2544 100644
--- a/cpp/test/reader/tree_view/tsfile_reader_tree_test.cc
+++ b/cpp/test/reader/tree_view/tsfile_reader_tree_test.cc
@@ -307,9 +307,12 @@ TEST_F(TsFileTreeReaderTest, ExtendedRowsAndColumnsTest) {
}
const int NUM_ROWS = 100;
+ int start_time = 0, end_time = -1;
for (int row = 0; row < NUM_ROWS; ++row) {
for (const auto& device_id : device_ids) {
- TsRecord record(device_id, row * 1000);
+ int timestamp = row * 1000;
+ TsRecord record(device_id, timestamp);
+ end_time = timestamp;
for (size_t i = 0; i < measurement_ids.size(); ++i) {
switch (data_types[i]) {
case INT64:
@@ -342,6 +345,18 @@ TEST_F(TsFileTreeReaderTest, ExtendedRowsAndColumnsTest) {
TsFileTreeReader reader;
reader.open(file_name_);
+ auto device_timeseries_map = reader.get_timeseries_metadata();
+ ASSERT_EQ(device_timeseries_map.size(), device_ids.size());
+ auto device_timeseries = device_timeseries_map.at(
+ std::make_shared<StringArrayDeviceID>(device_ids[0]));
+ ASSERT_EQ(device_timeseries.size(), measurement_ids.size());
+ ASSERT_EQ(
+ device_timeseries[0]->get_measurement_name().to_std_string(),
+ *std::min_element(measurement_ids.begin(), measurement_ids.end()));
+ ASSERT_EQ(device_timeseries[0]->get_statistic()->start_time_, start_time);
+ ASSERT_EQ(device_timeseries[0]->get_statistic()->end_time_, end_time);
+ ASSERT_EQ(device_timeseries[0]->get_statistic()->count_, NUM_ROWS);
+ // Verify get_all_device_ids / get_all_devices
auto read_device_ids = reader.get_all_device_ids();
ASSERT_EQ(read_device_ids.size(), device_ids.size());
for (size_t i = 0; i < device_ids.size(); ++i) {
diff --git a/cpp/test/reader/tsfile_reader_test.cc
b/cpp/test/reader/tsfile_reader_test.cc
index b426d7ec7..54127e072 100644
--- a/cpp/test/reader/tsfile_reader_test.cc
+++ b/cpp/test/reader/tsfile_reader_test.cc
@@ -198,7 +198,7 @@ TEST_F(TsFileReaderTest, GetAllDevice) {
}
TEST_F(TsFileReaderTest, GetTimeseriesSchema) {
- std::vector<std::string> device_path = {"device", "device.ln"};
+ std::vector<std::string> device_path = {"device.ln1", "device.ln2 "};
std::vector<std::string> measurement_name = {"temperature", "humidity"};
common::TSDataType data_type = common::TSDataType::INT32;
common::TSEncoding encoding = common::TSEncoding::PLAIN;
@@ -236,6 +236,31 @@ TEST_F(TsFileReaderTest, GetTimeseriesSchema) {
measurement_schemas);
ASSERT_EQ(measurement_schemas[1].measurement_name_, measurement_name[1]);
ASSERT_EQ(measurement_schemas[1].data_type_, TSDataType::INT32);
+
+ std::vector<std::shared_ptr<IDeviceID>> one_device = {
+ std::make_shared<StringArrayDeviceID>(device_path[0])};
+ auto one_meta = reader.get_timeseries_metadata(one_device);
+ ASSERT_EQ(one_meta.size(), 1u);
+ auto timeseries_list = one_meta.begin()->second;
+ ASSERT_EQ(timeseries_list.size(), 1u);
+ ASSERT_EQ(timeseries_list[0]->get_measurement_name().to_std_string(),
+ measurement_name[0]);
+ ASSERT_EQ(timeseries_list[0]->get_statistic()->start_time_, 1622505600000);
+ ASSERT_EQ(timeseries_list[0]->get_statistic()->end_time_, 1622505600000);
+ ASSERT_EQ(timeseries_list[0]->get_statistic()->count_, 1);
+
+ auto device_timeseries_map = reader.get_timeseries_metadata();
+ ASSERT_EQ(device_timeseries_map.size(), 2u);
+ auto device_timeseries_1 = device_timeseries_map.at(
+ std::make_shared<StringArrayDeviceID>(device_path[1]));
+ ASSERT_EQ(device_timeseries_1.size(), 1u);
+ ASSERT_EQ(device_timeseries_1[0]->get_measurement_name().to_std_string(),
+ measurement_name[1]);
+ ASSERT_EQ(device_timeseries_1[0]->get_statistic()->start_time_,
+ 1622505600000);
+ ASSERT_EQ(device_timeseries_1[0]->get_statistic()->end_time_,
+ 1622505600000);
+ ASSERT_EQ(device_timeseries_1[0]->get_statistic()->count_, 1);
reader.close();
}
diff --git a/cpp/test/writer/tsfile_writer_test.cc
b/cpp/test/writer/tsfile_writer_test.cc
index 25684e726..30fded6eb 100644
--- a/cpp/test/writer/tsfile_writer_test.cc
+++ b/cpp/test/writer/tsfile_writer_test.cc
@@ -115,11 +115,6 @@ class TsFileWriterTest : public ::testing::Test {
class TsFileWriterTestSimple : public ::testing::Test {};
-TEST_F(TsFileWriterTestSimple, InitWithNullWriteFile) {
- TsFileWriter writer;
- ASSERT_EQ(writer.init(nullptr), E_INVALID_ARG);
-}
-
TEST_F(TsFileWriterTest, WriteDiffDataType) {
std::string device_name = "test_table";
common::TSEncoding encoding = common::TSEncoding::PLAIN;