This is an automated email from the ASF dual-hosted git repository.

twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new cd42ad4a3 feat(ts): Add data structures for uncompressed chunks (#3091)
cd42ad4a3 is described below

commit cd42ad4a3ea3834d3216822d17b9cea55f54ad34
Author: RX Xiao <[email protected]>
AuthorDate: Tue Aug 5 15:55:04 2025 +0800

    feat(ts): Add data structures for uncompressed chunks (#3091)
    
    Co-authored-by: Twice <[email protected]>
    Co-authored-by: Twice <[email protected]>
---
 src/types/timeseries.cc                      | 459 +++++++++++++++++++++++++++
 src/types/timeseries.h                       | 197 ++++++++++++
 tests/cppunit/types/timeseries_chunk_test.cc | 375 ++++++++++++++++++++++
 3 files changed, 1031 insertions(+)

diff --git a/src/types/timeseries.cc b/src/types/timeseries.cc
new file mode 100644
index 000000000..89f0e1a8c
--- /dev/null
+++ b/src/types/timeseries.cc
@@ -0,0 +1,459 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "timeseries.h"
+
+#include <algorithm>
+
+#include "encoding.h"
+
+using AddResult = TSChunk::AddResult;
+using SampleBatch = TSChunk::SampleBatch;
+using SampleBatchSlice = TSChunk::SampleBatchSlice;
+
+TSChunkPtr CreateTSChunkFromData(nonstd::span<char> data) {
+  auto chunk_meta = TSChunk::MetaData();
+  Slice input(data.data(), TSChunk::MetaData::kEncodedSize);
+  chunk_meta.Decode(&input);
+  if (!chunk_meta.is_compressed) {
+    return std::make_unique<UncompTSChunk>(std::move(data));
+  } else {
+    // TODO: compressed chunk
+    unreachable();
+  }
+}
+
+OwnedTSChunk CreateEmptyOwnedTSChunk(bool is_compressed) {
+  auto metadata = TSChunk::MetaData(is_compressed, 0);
+  std::string data = metadata.Encode();
+  return {CreateTSChunkFromData(data), std::move(data)};
+}
+
+TSChunk::SampleBatch::SampleBatch(std::vector<TSSample> samples, 
DuplicatePolicy policy)
+    : samples_(std::move(samples)), policy_(policy) {
+  size_t count = samples_.size();
+  add_results_.resize(count, AddResult::kNone);
+  indexes_.resize(count);
+  for (size_t i = 0; i < count; ++i) {
+    indexes_[i] = i;
+  }
+  sortAndOrganize();
+}
+
+void TSChunk::SampleBatch::Expire(uint64_t last_ts, uint64_t retention) {
+  if (retention == 0) return;
+  for (auto idx : indexes_) {
+    if (samples_[idx].ts + retention < last_ts) {
+      add_results_[idx] = AddResult::kOld;
+    } else if (samples_[idx].ts > last_ts) {
+      last_ts = samples_[idx].ts;
+    }
+  }
+}
+
+void TSChunk::SampleBatch::sortAndOrganize() {
+  auto count = samples_.size();
+  if (0 == count) return;
+
+  // should be stable sort
+  std::stable_sort(indexes_.begin(), indexes_.end(), [this](size_t a, size_t 
b) { return samples_[a] < samples_[b]; });
+  std::vector<TSSample> samples_sorted;
+  samples_sorted.reserve(indexes_.size());
+  for (size_t i = 0; i < count; ++i) {
+    samples_sorted.push_back(samples_[indexes_[i]]);
+  }
+  samples_ = std::move(samples_sorted);
+
+  size_t prev_idx = 0;
+  add_results_[0] = AddResult::kNone;
+  for (size_t i = 1; i < count; ++i) {
+    TSSample* cur = &samples_[i];
+    auto result = MergeSamplesValue(samples_[prev_idx], *cur, policy_);
+    if (result == AddResult::kNone) {
+      prev_idx = i;
+    }
+    add_results_[i] = result;
+  }
+}
+
+SampleBatchSlice TSChunk::SampleBatchSlice::SliceByCount(uint64_t first, int 
count, uint64_t* last_ts) {
+  if (sample_span_.empty()) {
+    return {};
+  }
+
+  auto start_it = std::lower_bound(sample_span_.begin(), sample_span_.end(), 
TSSample{first, 0.0});
+  if (start_it == sample_span_.end()) {
+    return {};
+  }
+
+  size_t start_idx = start_it - sample_span_.begin();
+
+  if (count < 0) {
+    if (last_ts) {
+      *last_ts = sample_span_.back().ts;
+    }
+    return createSampleSlice(start_idx, sample_span_.size());
+  }
+
+  size_t end_idx = start_idx;
+  while (end_idx < sample_span_.size() && count > 0) {
+    if (add_result_span_[end_idx] == AddResult::kNone) {
+      if (last_ts) {
+        *last_ts = sample_span_[end_idx].ts;
+      }
+      count--;
+    }
+    end_idx++;
+  }
+
+  return createSampleSlice(start_idx, end_idx);
+}
+
+SampleBatchSlice TSChunk::SampleBatchSlice::SliceByTimestamps(uint64_t first, 
uint64_t last, bool contain_last) {
+  if (sample_span_.empty()) {
+    return {};
+  }
+
+  auto start_it = std::lower_bound(sample_span_.begin(), sample_span_.end(), 
TSSample{first, 0.0});
+  auto end_it = contain_last ? std::upper_bound(sample_span_.begin(), 
sample_span_.end(), TSSample{last, 0.0})
+                             : std::lower_bound(sample_span_.begin(), 
sample_span_.end(), TSSample{last, 0.0});
+
+  size_t start_idx = start_it - sample_span_.begin();
+  size_t end_idx = end_it - sample_span_.begin();
+
+  if (start_idx < end_idx) {
+    return createSampleSlice(start_idx, end_idx);
+  }
+  return {};
+}
+
+SampleBatchSlice TSChunk::SampleBatchSlice::createSampleSlice(size_t 
start_idx, size_t end_idx) {
+  if (end_idx > sample_span_.size()) {
+    end_idx = sample_span_.size();
+  }
+  if (end_idx - start_idx == 0) {
+    return {};
+  }
+  return {nonstd::span<const TSSample>(&sample_span_[start_idx], end_idx - 
start_idx),
+          nonstd::span<AddResult>(&add_result_span_[start_idx], end_idx - 
start_idx), policy_};
+}
+
+SampleBatchSlice TSChunk::SampleBatch::AsSlice() { return {samples_, 
add_results_, policy_}; }
+
+std::vector<AddResult> TSChunk::SampleBatch::GetFinalResults() const {
+  std::vector<AddResult> res;
+  res.resize(add_results_.size());
+  for (size_t idx = 0; idx < add_results_.size(); idx++) {
+    res[indexes_[idx]] = add_results_[idx];
+  }
+  return res;
+}
+
+AddResult TSChunk::MergeSamplesValue(TSSample& to, const TSSample& from, 
DuplicatePolicy policy) {
+  if (to.ts != from.ts) {
+    return AddResult::kNone;
+  }
+
+  switch (policy) {
+    case DuplicatePolicy::BLOCK:
+      return AddResult::kBlock;
+    case DuplicatePolicy::FIRST:
+      return AddResult::kOk;
+    case DuplicatePolicy::LAST:
+      to.v = from.v;
+      return AddResult::kOk;
+    case DuplicatePolicy::MAX:
+      to.v = std::max(to.v, from.v);
+      return AddResult::kOk;
+    case DuplicatePolicy::MIN:
+      to.v = std::min(to.v, from.v);
+      return AddResult::kOk;
+    case DuplicatePolicy::SUM:
+      to.v += from.v;
+      return AddResult::kOk;
+  }
+
+  return AddResult::kNone;
+}
+
+uint32_t TSChunk::GetCount() const { return metadata_.count; }
+
+uint64_t TSChunk::SampleBatchSlice::GetFirstTimestamp() {
+  if (sample_span_.size() == 0) return 0;
+  for (size_t i = 0; i < sample_span_.size(); i++) {
+    if (add_result_span_[i] == AddResult::kNone) {
+      return sample_span_[i].ts;
+    }
+  }
+  return 0;
+}
+
+uint64_t TSChunk::SampleBatchSlice::GetLastTimestamp() {
+  if (sample_span_.size() == 0) return 0;
+  for (size_t i = sample_span_.size() - 1; i >= 0; i--) {
+    if (add_result_span_[i] == AddResult::kNone) {
+      return sample_span_[i].ts;
+    }
+  }
+  return 0;
+}
+
+size_t TSChunk::SampleBatchSlice::GetValidCount() const {
+  size_t count = 0;
+  for (auto res : add_result_span_) {
+    if (res == AddResult::kNone) {
+      count++;
+    }
+  }
+  return count;
+}
+
+std::string TSChunk::MetaData::Encode() const {
+  std::string ret;
+  // Reserved some bits for future
+  uint32_t flag = 0;
+  flag |= (is_compressed ? uint32_t(1) : 0);
+  PutFixed32(&ret, flag);
+  PutFixed32(&ret, count);
+  return ret;
+}
+
+void TSChunk::MetaData::Decode(Slice* input) {
+  uint32_t flag = 0;
+  GetFixed32(input, &flag);
+  is_compressed = flag & uint32_t(1);
+  GetFixed32(input, &count);
+}
+
+TSChunk::TSChunk(nonstd::span<char> data) : data_(data) {
+  Slice input(data_.data(), data_.size());
+  metadata_.Decode(&input);
+}
+
+class UncompTSChunkIterator : public TSChunkIterator {
+ public:
+  explicit UncompTSChunkIterator(nonstd::span<TSSample> data, uint64_t count) 
: TSChunkIterator(count), data_(data) {}
+  std::optional<TSSample*> Next() override {
+    if (idx_ >= count_) return std::nullopt;
+    return &data_[idx_++];
+  }
+
+ private:
+  nonstd::span<TSSample> data_;
+};
+
+UncompTSChunk::UncompTSChunk(nonstd::span<char> data) : TSChunk(data) {
+  auto data_ptr = reinterpret_cast<char*>(data.data()) + 
TSChunk::MetaData::kEncodedSize;
+  samples_ = nonstd::span<TSSample>(reinterpret_cast<TSSample*>(data_ptr), 
metadata_.count);
+}
+
+std::unique_ptr<TSChunkIterator> UncompTSChunk::CreateIterator() const {
+  return std::make_unique<UncompTSChunkIterator>(samples_, metadata_.count);
+}
+
+uint64_t UncompTSChunk::GetFirstTimestamp() const {
+  if (metadata_.count == 0) {
+    return 0;
+  }
+  return samples_[0].ts;
+}
+
+uint64_t UncompTSChunk::GetLastTimestamp() const {
+  if (metadata_.count == 0) {
+    return 0;
+  }
+  return samples_[metadata_.count - 1].ts;
+}
+
+std::string UncompTSChunk::UpsertSamples(SampleBatchSlice batch) const {
+  const auto new_valid_count = batch.GetValidCount();
+  if (new_valid_count == 0) {
+    return "";
+  }
+
+  auto new_samples = batch.GetSampleSpan();
+  auto add_results = batch.GetAddResultSpan();
+  DuplicatePolicy policy = batch.GetPolicy();
+  const size_t existing_count = metadata_.count;
+
+  // Calculate buffer size: header + existing samples + unique new samples
+  const size_t header_size = TSChunk::MetaData::kEncodedSize;
+  const size_t required_size = header_size + (existing_count + 
new_valid_count) * sizeof(TSSample);
+
+  // Prepare new buffer
+  std::string new_buffer;
+  new_buffer.resize(required_size);
+  auto* merged_data = reinterpret_cast<TSSample*>(new_buffer.data() + 
header_size);
+
+  // Prepare iterators for merging
+  size_t new_sample_idx = 0;
+  auto existing_sample_iter = std::upper_bound(samples_.begin(), 
samples_.end(), new_samples[0]);
+
+  // Copy existing samples that are before the first new sample
+  const size_t preserved_count = std::distance(samples_.begin(), 
existing_sample_iter);
+  size_t current_index = preserved_count;
+  if (preserved_count > 0) {
+    std::memcpy(merged_data, samples_.data(), preserved_count * 
sizeof(TSSample));
+    current_index--;  // Point to last copied sample
+  } else {
+    current_index = -1;  // Special case: no preserved samples
+  }
+
+  // Merge samples from both sources
+  while (new_sample_idx != new_samples.size() && existing_sample_iter != 
samples_.end()) {
+    const TSSample* candidate = nullptr;
+    bool from_new_batch = false;
+
+    // Select next sample by earliest timestamp
+    if (existing_sample_iter->ts <= new_samples[new_sample_idx].ts) {
+      candidate = &(*existing_sample_iter);
+    } else {
+      candidate = &new_samples[new_sample_idx];
+      from_new_batch = true;
+    }
+    if (from_new_batch && add_results[new_sample_idx] != AddResult::kNone) {
+      new_sample_idx++;
+      continue;
+    }
+
+    // Handle first sample case
+    if (current_index == static_cast<size_t>(-1)) {
+      merged_data[0] = *candidate;
+      current_index = 0;
+      continue;
+    }
+
+    // Append or merge based on timestamp
+    if (candidate->ts > merged_data[current_index].ts) {
+      merged_data[++current_index] = *candidate;
+    } else {
+      if (from_new_batch) {
+        auto add_res = MergeSamplesValue(merged_data[current_index], 
*candidate, policy);
+        add_results[new_sample_idx] = add_res;
+      }
+    }
+
+    // Update the index
+    if (from_new_batch) {
+      new_sample_idx++;
+    } else {
+      existing_sample_iter++;
+    }
+  }
+
+  // Copy remaining existing samples
+  if (existing_sample_iter != samples_.end()) {
+    const size_t remaining_count = std::distance(existing_sample_iter, 
samples_.end());
+    std::memcpy(&merged_data[current_index + 1], &(*existing_sample_iter), 
remaining_count * sizeof(TSSample));
+    current_index += remaining_count;
+  }
+
+  // Process remaining new samples
+  while (new_sample_idx != new_samples.size()) {
+    if (add_results[new_sample_idx] != AddResult::kNone) {
+      ++new_sample_idx;
+      continue;
+    }
+    if (current_index == static_cast<size_t>(-1)) {
+      current_index = 0;
+      merged_data[current_index] = new_samples[new_sample_idx];
+    } else if (new_samples[new_sample_idx].ts > merged_data[current_index].ts) 
{
+      merged_data[++current_index] = new_samples[new_sample_idx];
+    } else {
+      auto add_res = MergeSamplesValue(merged_data[current_index], 
new_samples[new_sample_idx], policy);
+      add_results[new_sample_idx] = add_res;
+    }
+    ++new_sample_idx;
+  }
+
+  // Update metadata in buffer header
+  const size_t final_count = current_index + 1;
+  auto metadata = TSChunk::MetaData(false, 0);
+  metadata.count = final_count;
+  auto str = metadata.Encode();
+  EncodeBuffer(new_buffer.data(), str);
+
+  return new_buffer;
+}
+
+std::string UncompTSChunk::RemoveSamplesBetween(uint64_t from, uint64_t to) 
const {
+  if (from > to) {
+    return "";
+  }
+
+  // Find the range of samples to delete using binary search
+  auto start_it = std::lower_bound(samples_.begin(), samples_.end(), 
TSSample{from, 0.0});
+  if (start_it == samples_.end()) {
+    return "";
+  }
+  auto end_it = std::upper_bound(samples_.begin(), samples_.end(), 
TSSample{to, 0.0});
+
+  size_t start_idx = std::distance(samples_.begin(), start_it);
+  size_t end_idx = std::distance(samples_.begin(), end_it);
+
+  // Calculate buffer size: header + remaining samples
+  const size_t header_size = TSChunk::MetaData::kEncodedSize;
+  const size_t remaining_count = metadata_.count - (end_idx - start_idx);
+  const size_t required_size = header_size + remaining_count * 
sizeof(TSSample);
+
+  // Prepare new buffer
+  std::string new_buffer;
+  new_buffer.resize(required_size);
+
+  // Copy header + samples before deletion range
+  size_t part_size = header_size + start_idx * sizeof(TSSample);
+  std::memcpy(new_buffer.data(), data_.data(), part_size);
+
+  // Copy samples after deletion range
+  if (end_idx < metadata_.count) {
+    std::memcpy(new_buffer.data() + part_size,
+                reinterpret_cast<const char*>(samples_.data()) + end_idx * 
sizeof(TSSample),
+                (metadata_.count - end_idx) * sizeof(TSSample));
+  }
+
+  // Update metadata in buffer header
+  auto metadata = TSChunk::MetaData(false, remaining_count);
+  auto str = metadata.Encode();
+  EncodeBuffer(new_buffer.data(), str);
+
+  return new_buffer;
+}
+
+std::string UncompTSChunk::UpdateSampleValue(uint64_t ts, double value, bool 
is_add_on) const {
+  if (ts < GetFirstTimestamp() || ts > GetLastTimestamp()) {
+    return "";
+  }
+
+  // Find the position of the sample to update
+  auto it = std::lower_bound(samples_.begin(), samples_.end(), TSSample{ts, 
0.0});
+  if (it == samples_.end() || it->ts != ts) {
+    return "";  // Sample not found
+  }
+  auto cur_value = it->v;
+
+  std::string new_buffer = std::string(data_.data(), data_.size());
+  size_t header_size = TSChunk::MetaData::kEncodedSize;
+  auto* new_samples = reinterpret_cast<TSSample*>(new_buffer.data() + 
header_size);
+  auto idx = std::distance(samples_.begin(), it);
+  double new_value = is_add_on ? cur_value + value : value;
+  new_samples[idx] = TSSample{ts, new_value};
+
+  return new_buffer;
+}
diff --git a/src/types/timeseries.h b/src/types/timeseries.h
new file mode 100644
index 000000000..e5f82ebec
--- /dev/null
+++ b/src/types/timeseries.h
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#pragma once
+
+#include <nonstd/span.hpp>
+#include <optional>
+
+#include "storage/redis_metadata.h"
+
+class TSChunk;
+class UncompTSChunk;
+
+using TSChunkPtr = std::unique_ptr<TSChunk>;
+using OwnedTSChunk = std::tuple<TSChunkPtr, std::string>;
+
+// Creates a TSChunk from the provided raw data buffer.
+TSChunkPtr CreateTSChunkFromData(nonstd::span<char> data);
+
+// Creates an empty owned time series chunk with specified compression option.
+OwnedTSChunk CreateEmptyOwnedTSChunk(bool is_compressed = false);
+
+struct TSSample {
+  uint64_t ts;
+  double v;
+
+  static constexpr uint64_t MAX_TIMESTAMP = 
std::numeric_limits<uint64_t>::max();
+
+  // Custom comparison operator for sorting by ts
+  bool operator<(const TSSample& other) const { return ts < other.ts; }
+  bool operator==(const TSSample& other) const { return ts == other.ts; }
+};
+
+// Simple TSChunk iterator base class providing basic traversal functionality
+class TSChunkIterator {
+ public:
+  explicit TSChunkIterator(uint64_t count) : count_(count), idx_(0) {}
+  virtual ~TSChunkIterator() = default;
+
+  virtual std::optional<TSSample*> Next() = 0;
+  virtual bool HasNext() const { return idx_ < count_; }
+
+ protected:
+  uint64_t count_;
+  uint64_t idx_;
+};
+
+class TSChunk {
+ public:
+  using DuplicatePolicy = TimeSeriesMetadata::DuplicatePolicy;
+
+  enum class AddResult : uint8_t {
+    kNone,
+    kOk,
+    kBlock,
+    kOld,
+  };
+
+  class SampleBatch;
+  class SampleBatchSlice {
+   public:
+    nonstd::span<const TSSample> GetSampleSpan() const { return sample_span_; }
+    nonstd::span<AddResult> GetAddResultSpan() { return add_result_span_; }
+    nonstd::span<const AddResult> GetAddResultSpan() const { return 
add_result_span_; }
+
+    // Slice samples by count. Returns count valid samples starting from first 
timestamp
+    // e.g., samples: {100,200,300}, first=200, count=2 -> {200,300}
+    SampleBatchSlice SliceByCount(uint64_t first, int count, uint64_t* last_ts 
= nullptr);
+
+    // Slice samples by timestamp range [first, last)
+    // e.g., samples: {10,20,30,40}, first=20, last=40 -> {20,30}
+    SampleBatchSlice SliceByTimestamps(uint64_t first, uint64_t last, bool 
contain_last = false);
+
+    uint64_t GetFirstTimestamp();
+    uint64_t GetLastTimestamp();
+
+    // Get number of valid samples (excluding duplicates and expired entries)
+    size_t GetValidCount() const;
+
+    DuplicatePolicy GetPolicy() const { return policy_; }
+    size_t Size() const { return sample_span_.size(); }
+    bool Empty() const { return sample_span_.empty(); }
+
+    friend class TSChunk::SampleBatch;
+
+   private:
+    nonstd::span<const TSSample> sample_span_;
+    nonstd::span<AddResult> add_result_span_;
+    DuplicatePolicy policy_;
+
+    SampleBatchSlice() = default;
+    SampleBatchSlice(nonstd::span<const TSSample> samples, 
nonstd::span<AddResult> results, DuplicatePolicy policy)
+        : sample_span_(samples), add_result_span_(results), policy_(policy) {}
+
+    SampleBatchSlice createSampleSlice(size_t start_idx, size_t end_idx);
+  };
+
+  class SampleBatch {
+   public:
+    // Construct a batch of samples with duplicate policy
+    // Samples will be sorted and deduplicated according to policy
+    SampleBatch(std::vector<TSSample> samples, DuplicatePolicy policy);
+
+    // Mark samples as expired if ts + retention < last_ts
+    // e.g., retention=3600, last_ts=5000 -> samples before 5000-3600 are 
expired
+    void Expire(uint64_t last_ts, uint64_t retention);
+
+    SampleBatchSlice AsSlice();
+
+    // Return add results by samples' order
+    std::vector<AddResult> GetFinalResults() const;
+
+   private:
+    std::vector<TSSample> samples_;
+    std::vector<size_t> indexes_;  // Record original index cause of sorting
+    std::vector<AddResult> add_results_;
+    DuplicatePolicy policy_;
+
+    void sortAndOrganize();
+  };
+
+  struct MetaData {
+    constexpr static size_t kEncodedSize = 2 * sizeof(uint32_t);
+
+    bool is_compressed;
+    uint32_t count;
+
+    MetaData() = default;
+    MetaData(bool is_compressed, uint32_t count) : 
is_compressed(is_compressed), count(count) {}
+    std::string Encode() const;
+    void Decode(Slice* input);
+  };
+
+  explicit TSChunk(nonstd::span<char> data);
+
+  virtual ~TSChunk() = default;
+
+  // Merge samples with duplicate policy handling
+  // Returns result status, updates 'to' value according to policy
+  static AddResult MergeSamplesValue(TSSample& to, const TSSample& from, 
DuplicatePolicy policy);
+
+  virtual std::unique_ptr<TSChunkIterator> CreateIterator() const = 0;
+
+  uint32_t GetCount() const;
+  virtual uint64_t GetFirstTimestamp() const = 0;
+  virtual uint64_t GetLastTimestamp() const = 0;
+
+  // Add new samples to the chunk according to duplicate policy
+  // Returns new chunk data with merged samples. Returns empty string if no 
changes
+  virtual std::string UpsertSamples(SampleBatchSlice samples) const = 0;
+
+  // Delete samples in [from, to] timestamp range
+  // Returns new chunk data without deleted samples. Returns empty string if 
no changes
+  virtual std::string RemoveSamplesBetween(uint64_t from, uint64_t to) const = 
0;
+
+  // Update sample value at specified timestamp
+  // is_add_on controls whether to add to existing value or replace it
+  // Returns empty string if no changes
+  virtual std::string UpdateSampleValue(uint64_t ts, double value, bool 
is_add_on) const = 0;
+
+ protected:
+  nonstd::span<char> data_;
+  MetaData metadata_;
+};
+
+class UncompTSChunk : public TSChunk {
+ public:
+  explicit UncompTSChunk(nonstd::span<char> data);
+  std::unique_ptr<TSChunkIterator> CreateIterator() const override;
+
+  uint64_t GetFirstTimestamp() const override;
+  uint64_t GetLastTimestamp() const override;
+
+  std::string UpsertSamples(SampleBatchSlice samples) const override;
+  std::string RemoveSamplesBetween(uint64_t from, uint64_t to) const override;
+  std::string UpdateSampleValue(uint64_t ts, double value, bool is_add_on) 
const override;
+
+ private:
+  nonstd::span<TSSample> samples_;
+};
diff --git a/tests/cppunit/types/timeseries_chunk_test.cc 
b/tests/cppunit/types/timeseries_chunk_test.cc
new file mode 100644
index 000000000..dbbf0912e
--- /dev/null
+++ b/tests/cppunit/types/timeseries_chunk_test.cc
@@ -0,0 +1,375 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <gtest/gtest.h>
+
+#include <random>
+#include <vector>
+
+#include "test_base.h"
+#include "types/timeseries.h"
+
+namespace test {
+
+using SampleBatch = TSChunk::SampleBatch;
+using SampleBatchSlice = TSChunk::SampleBatchSlice;
+using DuplicatePolicy = TSChunk::DuplicatePolicy;
+using AddResult = TSChunk::AddResult;
+
+// Helper function to generate TSSample with specific timestamp and value
+TSSample MakeSample(uint64_t timestamp, double value) {
+  TSSample sample;
+  sample.ts = timestamp;
+  sample.v = value;
+  return sample;
+}
+
+// Test different duplicate policies
+TEST(RedisTimeSeriesChunkTest, PolicyBehaviors) {
+  TSSample original = MakeSample(100, 1.0);
+  TSSample duplicate = MakeSample(100, 2.0);
+
+  // Test BLOCK policy
+  EXPECT_EQ(TSChunk::MergeSamplesValue(original, duplicate, 
DuplicatePolicy::BLOCK), AddResult::kBlock);
+  EXPECT_EQ(original.v, 1.0);
+
+  // Test LAST policy
+  EXPECT_EQ(TSChunk::MergeSamplesValue(original, duplicate, 
DuplicatePolicy::LAST), AddResult::kOk);
+  EXPECT_EQ(original.v, 2.0);
+
+  // Reset and test MAX policy
+  original.v = 1.0;
+  EXPECT_EQ(TSChunk::MergeSamplesValue(original, duplicate, 
DuplicatePolicy::MAX), AddResult::kOk);
+  EXPECT_EQ(original.v, 2.0);
+
+  // Reset and test MIN policy
+  original.v = 3.0;
+  EXPECT_EQ(TSChunk::MergeSamplesValue(original, duplicate, 
DuplicatePolicy::MIN), AddResult::kOk);
+  EXPECT_EQ(original.v, 2.0);
+
+  // Reset and test SUM policy
+  original.v = 1.0;
+  EXPECT_EQ(TSChunk::MergeSamplesValue(original, duplicate, 
DuplicatePolicy::SUM), AddResult::kOk);
+  EXPECT_EQ(original.v, 3.0);
+}
+
+// Test timestamp-based slicing operations
+TEST(RedisTimeSeriesChunkTest, TimestampSlicing) {
+  std::vector<TSSample> samples = {MakeSample(100, 1.0), MakeSample(200, 2.0), 
MakeSample(300, 3.0),
+                                   MakeSample(400, 4.0)};
+  SampleBatch batch(std::move(samples), DuplicatePolicy::LAST);
+  SampleBatchSlice slice = batch.AsSlice();
+
+  // Test SliceByTimestamps with contain_last=true
+  auto result_slice = slice.SliceByTimestamps(150, 300, true);
+  EXPECT_EQ(result_slice.GetValidCount(), 2);
+  EXPECT_EQ(result_slice.GetFirstTimestamp(), 200);
+  EXPECT_EQ(result_slice.GetLastTimestamp(), 300);
+
+  // Test SliceByTimestamps with contain_last=false
+  result_slice = slice.SliceByTimestamps(150, 300, false);
+  EXPECT_EQ(result_slice.GetValidCount(), 1);
+  EXPECT_EQ(result_slice.GetFirstTimestamp(), 200);
+  EXPECT_EQ(result_slice.GetLastTimestamp(), 200);
+
+  // Test SliceByCount
+  uint64_t last_ts = 0;
+  result_slice = slice.SliceByCount(200, 2, &last_ts);
+  EXPECT_EQ(result_slice.GetValidCount(), 2);
+  EXPECT_EQ(last_ts, 300);
+  EXPECT_EQ(result_slice.GetFirstTimestamp(), 200);
+  EXPECT_EQ(result_slice.GetLastTimestamp(), 300);
+}
+
+// Test expiration logic
+TEST(RedisTimeSeriesChunkTest, ExpirationLogic) {
+  std::vector<TSSample> samples = {MakeSample(200, 1.0), MakeSample(400, 2.0), 
MakeSample(100, 3.0),
+                                   MakeSample(150, 4.0)};
+  SampleBatch batch(samples, DuplicatePolicy::LAST);
+
+  // Set retention to 150, last_ts = 300
+  batch.Expire(300, 150);
+  auto results = batch.GetFinalResults();
+
+  // Only samples with ts >= 150 should be kept
+  EXPECT_EQ(results[0], AddResult::kNone);
+  EXPECT_EQ(results[1], AddResult::kNone);
+  EXPECT_EQ(results[2], AddResult::kOld);
+  EXPECT_EQ(results[3], AddResult::kOld);
+}
+
+// Test SampleBatch construction and sorting
+TEST(RedisTimeSeriesChunkTest, BatchSortingAndDeduplication) {
+  std::vector<TSSample> samples = {
+      MakeSample(300, 3.0), MakeSample(100, 1.0), MakeSample(200, 2.0), 
MakeSample(100, 4.0),  // Duplicate timestamp
+      MakeSample(200, 5.0)                                                     
                // Duplicate timestamp
+  };
+
+  // Test with BLOCK policy
+  SampleBatch batch(samples, DuplicatePolicy::BLOCK);
+  SampleBatchSlice slice = batch.AsSlice();
+
+  // Verify sorting
+  EXPECT_EQ(slice.GetFirstTimestamp(), 100);
+  EXPECT_EQ(slice.GetLastTimestamp(), 300);
+
+  // Verify deduplication
+  EXPECT_EQ(slice.GetValidCount(), 3);
+  auto results = batch.GetFinalResults();
+  EXPECT_EQ(results[0], AddResult::kNone);
+  EXPECT_EQ(results[1], AddResult::kNone);
+  EXPECT_EQ(results[2], AddResult::kNone);
+  EXPECT_EQ(results[3], AddResult::kBlock);
+  EXPECT_EQ(results[4], AddResult::kBlock);
+}
+
+// Test MAddSample merging logic with additional samples and content validation
+TEST(RedisTimeSeriesChunkTest, UcompChunkMAddSampleLogic) {
+  // Create base chunk
+  auto [chunk, data] = CreateEmptyOwnedTSChunk(false);
+
+  // Create test samples with multiple duplicates and new timestamps
+  std::vector<TSSample> new_samples = {
+      MakeSample(300, 3.0), MakeSample(100, 1.0), MakeSample(200, 2.0), 
MakeSample(100, 4.0),
+      MakeSample(200, 5.0), MakeSample(400, 4.0), MakeSample(100, 6.0)  // 
Additional duplicates and new timestamp
+  };
+  SampleBatch batch(new_samples, DuplicatePolicy::LAST);
+  SampleBatchSlice slice = batch.AsSlice();
+
+  // Merge samples into chunk
+  std::string result = chunk->UpsertSamples(slice);
+  EXPECT_FALSE(result.empty());
+
+  // Verify merged chunk metadata
+  auto new_chunk = CreateTSChunkFromData(result);
+  EXPECT_EQ(new_chunk->GetCount(), 4);  // 100, 200, 300, 400 (with duplicates 
removed)
+  EXPECT_EQ(new_chunk->GetFirstTimestamp(), 100);
+  EXPECT_EQ(new_chunk->GetLastTimestamp(), 400);
+
+  // Validate content of merged chunk
+  auto iter = new_chunk->CreateIterator();
+  auto* sample = iter->Next().value();
+  EXPECT_EQ(sample->ts, 100);
+  EXPECT_EQ(sample->v, 6.0);  // Latest value for 100
+  sample = iter->Next().value();
+  EXPECT_EQ(sample->ts, 200);
+  EXPECT_EQ(sample->v, 5.0);  // Latest value for 200
+  sample = iter->Next().value();
+  EXPECT_EQ(sample->ts, 300);
+  EXPECT_EQ(sample->v, 3.0);
+  sample = iter->Next().value();
+  EXPECT_EQ(sample->ts, 400);
+  EXPECT_EQ(sample->v, 4.0);
+  EXPECT_EQ(iter->HasNext(), false);
+  EXPECT_EQ(iter->Next(), std::nullopt);
+}
+
+// Test UpsertSamples with a chunk that has existing samples
+TEST(RedisTimeSeriesChunkTest, UcompChunkMAddSampleWithExistingSamples) {
+  // Create empty chunk
+  auto [chunk, data] = CreateEmptyOwnedTSChunk(false);
+
+  // Initialize chunk with samples: 100, 200, 300
+  std::vector<TSSample> initial_samples = {MakeSample(100, 1.0), 
MakeSample(200, 2.0), MakeSample(300, 3.0)};
+  SampleBatch initial_batch(initial_samples, DuplicatePolicy::LAST);
+  SampleBatchSlice initial_slice = initial_batch.AsSlice();
+  std::string merged_data = chunk->UpsertSamples(initial_slice);
+  ASSERT_FALSE(merged_data.empty());
+
+  // New samples to add: 150, 200(update), 400
+  std::vector<TSSample> new_samples = {MakeSample(50, 0.5), MakeSample(150, 
1.5), MakeSample(200, 2.5),
+                                       MakeSample(300, 3.5), MakeSample(400, 
4.0)};
+  SampleBatch new_batch(new_samples, DuplicatePolicy::LAST);
+  SampleBatchSlice new_slice = new_batch.AsSlice();
+
+  // Perform merge
+  merged_data = CreateTSChunkFromData(merged_data)->UpsertSamples(new_slice);
+  ASSERT_FALSE(merged_data.empty());
+
+  // Validate final state
+  TSChunkPtr final_chunk = CreateTSChunkFromData(merged_data);
+  EXPECT_EQ(final_chunk->GetCount(), 6);
+  EXPECT_EQ(final_chunk->GetFirstTimestamp(), 50);
+  EXPECT_EQ(final_chunk->GetLastTimestamp(), 400);
+
+  // Verify content through iterator
+  auto iter = final_chunk->CreateIterator();
+  auto* sample = iter->Next().value();
+  EXPECT_EQ(sample->ts, 50);
+  EXPECT_EQ(sample->v, 0.5);
+
+  sample = iter->Next().value();
+  EXPECT_EQ(sample->ts, 100);
+  EXPECT_EQ(sample->v, 1.0);
+
+  sample = iter->Next().value();
+  EXPECT_EQ(sample->ts, 150);
+  EXPECT_EQ(sample->v, 1.5);
+
+  sample = iter->Next().value();
+  EXPECT_EQ(sample->ts, 200);
+  EXPECT_EQ(sample->v, 2.5);
+
+  sample = iter->Next().value();
+  EXPECT_EQ(sample->ts, 300);
+  EXPECT_EQ(sample->v, 3.5);
+
+  sample = iter->Next().value();
+  EXPECT_EQ(sample->ts, 400);
+  EXPECT_EQ(sample->v, 4.0);
+
+  EXPECT_FALSE(iter->HasNext());
+}
+
+// Test RemoveSamplesBetween with different deletion ranges and content 
validation
+TEST(RedisTimeSeriesChunkTest, UcompChunkDeletionRange) {
+  // Create base chunk with samples: 100, 200, 300, 400, 500
+  auto [chunk, data] = CreateEmptyOwnedTSChunk(false);
+  std::vector<TSSample> initial_samples = {MakeSample(100, 1.0), 
MakeSample(200, 2.0), MakeSample(300, 3.0),
+                                           MakeSample(400, 4.0), 
MakeSample(500, 5.0)};
+  SampleBatch initial_batch(initial_samples, DuplicatePolicy::LAST);
+  SampleBatchSlice initial_slice = initial_batch.AsSlice();
+  std::string merged_data = chunk->UpsertSamples(initial_slice);
+  ASSERT_FALSE(merged_data.empty());
+  TSChunkPtr test_chunk = CreateTSChunkFromData(merged_data);
+
+  // Test 1: Delete middle range (200-400 inclusive)
+  std::string deleted_data = test_chunk->RemoveSamplesBetween(200, 400);
+  ASSERT_FALSE(deleted_data.empty());
+  TSChunkPtr result_chunk = CreateTSChunkFromData(deleted_data);
+  EXPECT_EQ(result_chunk->GetCount(), 2);
+
+  auto iter = result_chunk->CreateIterator();
+  auto* sample = iter->Next().value();
+  EXPECT_EQ(sample->ts, 100);
+  EXPECT_EQ(sample->v, 1.0);
+  sample = iter->Next().value();
+  EXPECT_EQ(sample->ts, 500);
+  EXPECT_EQ(sample->v, 5.0);
+  EXPECT_FALSE(iter->HasNext());
+
+  // Test 2: Delete range (300,1000)
+  deleted_data = test_chunk->RemoveSamplesBetween(300, 1000);
+  ASSERT_FALSE(deleted_data.empty());
+  result_chunk = CreateTSChunkFromData(deleted_data);
+  EXPECT_EQ(result_chunk->GetCount(), 2);
+  EXPECT_EQ(result_chunk->GetFirstTimestamp(), 100);
+  EXPECT_EQ(result_chunk->GetLastTimestamp(), 200);
+
+  // Test 3: Delete range (0, 300)
+  deleted_data = test_chunk->RemoveSamplesBetween(0, 300);
+  ASSERT_FALSE(deleted_data.empty());
+  result_chunk = CreateTSChunkFromData(deleted_data);
+  EXPECT_EQ(result_chunk->GetCount(), 2);
+  EXPECT_EQ(result_chunk->GetFirstTimestamp(), 400);
+  EXPECT_EQ(result_chunk->GetLastTimestamp(), 500);
+
+  // Test 4: Delete entire range (100-500)
+  deleted_data = test_chunk->RemoveSamplesBetween(0, 1000);
+  ASSERT_FALSE(deleted_data.empty());
+  result_chunk = CreateTSChunkFromData(deleted_data);
+  EXPECT_EQ(result_chunk->GetCount(), 0);
+  EXPECT_EQ(result_chunk->GetFirstTimestamp(), 0);
+  EXPECT_EQ(result_chunk->GetLastTimestamp(), 0);
+
+  // Test 5: Delete from > to (should return original data)
+  deleted_data = test_chunk->RemoveSamplesBetween(500, 100);
+  EXPECT_TRUE(deleted_data.empty());
+
+  // Test 6: Delete single timestamp (300)
+  deleted_data = test_chunk->RemoveSamplesBetween(300, 300);
+  ASSERT_FALSE(deleted_data.empty());
+  result_chunk = CreateTSChunkFromData(deleted_data);
+  EXPECT_EQ(result_chunk->GetCount(), 4);
+
+  iter = result_chunk->CreateIterator();
+  sample = iter->Next().value();
+  EXPECT_EQ(sample->ts, 100);
+  sample = iter->Next().value();
+  EXPECT_EQ(sample->ts, 200);
+  sample = iter->Next().value();
+  EXPECT_EQ(sample->ts, 400);
+  sample = iter->Next().value();
+  EXPECT_EQ(sample->ts, 500);
+  EXPECT_FALSE(iter->HasNext());
+}
+
+// Test UpdateSampleValue with different update scenarios and validation
+TEST(RedisTimeSeriesChunkTest, UpdateSampleBehavior) {
+  // Initialize chunk with samples: 100, 200, 300
+  auto [chunk, data] = CreateEmptyOwnedTSChunk(false);
+  std::vector<TSSample> initial_samples = {MakeSample(100, 1.0), 
MakeSample(200, 2.0), MakeSample(300, 3.0)};
+  SampleBatch initial_batch(initial_samples, DuplicatePolicy::LAST);
+  SampleBatchSlice initial_slice = initial_batch.AsSlice();
+  std::string merged_data = chunk->UpsertSamples(initial_slice);
+  ASSERT_FALSE(merged_data.empty());
+  TSChunkPtr test_chunk = CreateTSChunkFromData(merged_data);
+
+  // Test 1: Update existing sample with replace (is_add_on = false)
+  std::string updated_data = test_chunk->UpdateSampleValue(200, 5.0, false);
+  ASSERT_FALSE(updated_data.empty());
+  TSChunkPtr result_chunk = CreateTSChunkFromData(updated_data);
+  EXPECT_EQ(result_chunk->GetCount(), 3);
+
+  auto iter = result_chunk->CreateIterator();
+  auto* sample = iter->Next().value();
+  EXPECT_EQ(sample->ts, 100);
+  EXPECT_EQ(sample->v, 1.0);
+
+  sample = iter->Next().value();
+  EXPECT_EQ(sample->ts, 200);
+  EXPECT_EQ(sample->v, 5.0);  // Value should be replaced
+
+  sample = iter->Next().value();
+  EXPECT_EQ(sample->ts, 300);
+  EXPECT_EQ(sample->v, 3.0);
+
+  // Test 2: Update existing sample with add-on (is_add_on = true)
+  updated_data = test_chunk->UpdateSampleValue(200, 1.5, true);
+  ASSERT_FALSE(updated_data.empty());
+  result_chunk = CreateTSChunkFromData(updated_data);
+
+  iter = result_chunk->CreateIterator();
+  sample = iter->Next().value();
+  EXPECT_EQ(sample->ts, 100);
+  EXPECT_EQ(sample->v, 1.0);
+
+  sample = iter->Next().value();
+  EXPECT_EQ(sample->ts, 200);
+  EXPECT_EQ(sample->v, 3.5);  // 2.0 + 1.5
+
+  sample = iter->Next().value();
+  EXPECT_EQ(sample->ts, 300);
+  EXPECT_EQ(sample->v, 3.0);
+
+  // Test 3: Update non-existent sample
+  updated_data = test_chunk->UpdateSampleValue(400, 5.0, false);
+  EXPECT_TRUE(updated_data.empty());  // Should return empty buffer
+
+  // Test 4: Update sample out of range (before first)
+  updated_data = test_chunk->UpdateSampleValue(50, 0.5, false);
+  EXPECT_TRUE(updated_data.empty());
+
+  // Test 5: Update sample out of range (after last)
+  updated_data = test_chunk->UpdateSampleValue(500, 5.0, false);
+  EXPECT_TRUE(updated_data.empty());
+}
+
+}  // namespace test


Reply via email to