This is an automated email from the ASF dual-hosted git repository.
colinlee pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/tsfile.git
The following commit(s) were added to refs/heads/develop by this push:
new 9374f727 Remove timestamps cache to optmize init phase (#769)
9374f727 is described below
commit 9374f72709d6225598ff7517d0106351c35d647b
Author: YangCaiyin <[email protected]>
AuthorDate: Thu Apr 9 17:52:18 2026 +0800
Remove timestamps cache to optmize init phase (#769)
* remove timestamps cache to optmize init phase
* add timeline statistics
* add timeline statistics
* fix CI
* spotless
* spotless
* fix by review
* change timeline statistics type
---
cpp/src/cwrapper/tsfile_cwrapper.cc | 113 ++++++++++++++++-
cpp/src/cwrapper/tsfile_cwrapper.h | 1 +
cpp/src/file/tsfile_io_reader.cc | 53 +++++++-
cpp/src/reader/tsfile_reader.cc | 41 +++++--
cpp/src/reader/tsfile_reader.h | 2 +
python/tests/test_reader_metadata.py | 64 +++++++++-
python/tests/test_tsfile_dataset.py | 93 ++++++++++++--
python/tsfile/__init__.py | 7 +-
python/tsfile/dataset/dataframe.py | 229 ++++++++++++++++++++++++++++++++---
python/tsfile/dataset/metadata.py | 34 +++---
python/tsfile/dataset/reader.py | 224 ++++++++++++++++++++--------------
python/tsfile/dataset/timeseries.py | 41 ++++---
python/tsfile/schema.py | 1 +
python/tsfile/tsfile_cpp.pxd | 1 +
python/tsfile/tsfile_py_cpp.pyx | 2 +
python/tsfile/tsfile_reader.pyx | 1 +
16 files changed, 737 insertions(+), 170 deletions(-)
diff --git a/cpp/src/cwrapper/tsfile_cwrapper.cc
b/cpp/src/cwrapper/tsfile_cwrapper.cc
index e1b5617a..99db6104 100644
--- a/cpp/src/cwrapper/tsfile_cwrapper.cc
+++ b/cpp/src/cwrapper/tsfile_cwrapper.cc
@@ -970,6 +970,81 @@ int fill_timeseries_statistic(storage::Statistic* st,
return common::E_OK;
}
+int fill_timeline_statistic(storage::ITimeseriesIndex* idx,
+ TimeseriesStatistic* out) {
+ clear_timeseries_statistic(out);
+ if (idx == nullptr) {
+ return common::E_OK;
+ }
+
+ auto* aligned_idx = dynamic_cast<storage::AlignedTimeseriesIndex*>(idx);
+ if (aligned_idx != nullptr && aligned_idx->time_ts_idx_ != nullptr &&
+ aligned_idx->time_ts_idx_->get_statistic() != nullptr) {
+ auto* st = aligned_idx->time_ts_idx_->get_statistic();
+ TsFileStatisticBase* b = tsfile_statistic_base(out);
+ b->has_statistic = true;
+ b->type = TS_DATATYPE_VECTOR;
+ b->row_count = st->get_count();
+ b->start_time = st->start_time_;
+ b->end_time = st->get_end_time();
+ return common::E_OK;
+ }
+
+ if (idx->get_statistic() != nullptr &&
+ idx->get_time_chunk_meta_list() == nullptr) {
+ auto* st = idx->get_statistic();
+ TsFileStatisticBase* b = tsfile_statistic_base(out);
+ b->has_statistic = true;
+ b->type = TS_DATATYPE_VECTOR;
+ b->row_count = st->get_count();
+ b->start_time = st->start_time_;
+ b->end_time = st->get_end_time();
+ return common::E_OK;
+ }
+
+ auto* list = idx->get_time_chunk_meta_list();
+ if (list == nullptr) {
+ list = idx->get_chunk_meta_list();
+ }
+ if (list == nullptr) {
+ return common::E_OK;
+ }
+
+ int64_t row_count = 0;
+ int64_t start_time = 0;
+ int64_t end_time = 0;
+ bool has_statistic = false;
+ for (auto it = list->begin(); it != list->end(); it++) {
+ auto* chunk_meta = it.get();
+ if (chunk_meta == nullptr || chunk_meta->statistic_ == nullptr ||
+ chunk_meta->statistic_->count_ <= 0) {
+ continue;
+ }
+ if (!has_statistic) {
+ start_time = chunk_meta->statistic_->start_time_;
+ end_time = chunk_meta->statistic_->end_time_;
+ has_statistic = true;
+ } else {
+ start_time =
+ std::min(start_time, chunk_meta->statistic_->start_time_);
+ end_time = std::max(end_time, chunk_meta->statistic_->end_time_);
+ }
+ row_count += chunk_meta->statistic_->count_;
+ }
+
+ if (!has_statistic) {
+ return common::E_OK;
+ }
+
+ TsFileStatisticBase* b = tsfile_statistic_base(out);
+ b->has_statistic = true;
+ b->type = TS_DATATYPE_VECTOR;
+ b->row_count = row_count;
+ b->start_time = start_time;
+ b->end_time = end_time;
+ return common::E_OK;
+}
+
void free_device_timeseries_metadata_entries_partial(
DeviceTimeseriesMetadataEntry* entries, size_t filled_count) {
if (entries == nullptr) {
@@ -981,6 +1056,8 @@ void free_device_timeseries_metadata_entries_partial(
for (uint32_t j = 0; j < entries[i].timeseries_count; j++) {
free_timeseries_statistic_heap(
&entries[i].timeseries[j].statistic);
+ free_timeseries_statistic_heap(
+ &entries[i].timeseries[j].timeline_statistic);
free(entries[i].timeseries[j].measurement_name);
}
free(entries[i].timeseries);
@@ -1139,10 +1216,19 @@ ERRNO populate_c_metadata_map_from_cpp(
free_device_timeseries_metadata_entries_partial(entries, di);
return common::E_OOM;
}
- m.data_type = static_cast<TSDataType>(idx->get_data_type());
+ auto* aligned_idx =
+ dynamic_cast<storage::AlignedTimeseriesIndex*>(idx.get());
+ if (aligned_idx != nullptr &&
+ aligned_idx->value_ts_idx_ != nullptr) {
+ m.data_type = static_cast<TSDataType>(
+ aligned_idx->value_ts_idx_->get_data_type());
+ } else {
+ m.data_type = static_cast<TSDataType>(idx->get_data_type());
+ }
storage::Statistic* st = idx->get_statistic();
int32_t chunk_cnt = 0;
- auto* cl = idx->get_chunk_meta_list();
+ auto* cl = aligned_idx != nullptr ?
idx->get_value_chunk_meta_list()
+ : idx->get_chunk_meta_list();
if (cl != nullptr) {
chunk_cnt = static_cast<int32_t>(cl->size());
}
@@ -1151,9 +1237,12 @@ ERRNO populate_c_metadata_map_from_cpp(
if (st_rc != common::E_OK) {
for (uint32_t u = 0; u < slot; u++) {
free_timeseries_statistic_heap(&e.timeseries[u].statistic);
+ free_timeseries_statistic_heap(
+ &e.timeseries[u].timeline_statistic);
free(e.timeseries[u].measurement_name);
}
free_timeseries_statistic_heap(&m.statistic);
+ free_timeseries_statistic_heap(&m.timeline_statistic);
free(m.measurement_name);
free(e.timeseries);
e.timeseries = nullptr;
@@ -1161,6 +1250,24 @@ ERRNO populate_c_metadata_map_from_cpp(
free_device_timeseries_metadata_entries_partial(entries, di);
return st_rc;
}
+ const int timeline_st_rc =
+ fill_timeline_statistic(idx.get(), &m.timeline_statistic);
+ if (timeline_st_rc != common::E_OK) {
+ for (uint32_t u = 0; u < slot; u++) {
+ free_timeseries_statistic_heap(&e.timeseries[u].statistic);
+ free_timeseries_statistic_heap(
+ &e.timeseries[u].timeline_statistic);
+ free(e.timeseries[u].measurement_name);
+ }
+ free_timeseries_statistic_heap(&m.statistic);
+ free_timeseries_statistic_heap(&m.timeline_statistic);
+ free(m.measurement_name);
+ free(e.timeseries);
+ e.timeseries = nullptr;
+ clear_metadata_entry_device_only(&e);
+ free_device_timeseries_metadata_entries_partial(entries, di);
+ return timeline_st_rc;
+ }
slot++;
}
di++;
@@ -1591,4 +1698,4 @@ ResultSet tsfile_query_table_with_tag_filter(
#ifdef __cplusplus
}
-#endif
\ No newline at end of file
+#endif
diff --git a/cpp/src/cwrapper/tsfile_cwrapper.h
b/cpp/src/cwrapper/tsfile_cwrapper.h
index aa7fd029..ae3e28ee 100644
--- a/cpp/src/cwrapper/tsfile_cwrapper.h
+++ b/cpp/src/cwrapper/tsfile_cwrapper.h
@@ -193,6 +193,7 @@ typedef struct TimeseriesMetadata {
TSDataType data_type;
int32_t chunk_meta_count;
TimeseriesStatistic statistic;
+ TimeseriesStatistic timeline_statistic;
} TimeseriesMetadata;
/**
diff --git a/cpp/src/file/tsfile_io_reader.cc b/cpp/src/file/tsfile_io_reader.cc
index 31d4a303..e96008a4 100644
--- a/cpp/src/file/tsfile_io_reader.cc
+++ b/cpp/src/file/tsfile_io_reader.cc
@@ -96,14 +96,61 @@ int
TsFileIOReader::get_device_timeseries_meta_without_chunk_meta(
int64_t end_offset;
std::vector<std::pair<std::shared_ptr<IMetaIndexEntry>, int64_t>>
meta_index_entry_list;
+ std::shared_ptr<MetaIndexNode> top_node;
+ bool is_aligned = false;
+ TimeseriesIndex* time_timeseries_index = nullptr;
if (RET_FAIL(load_device_index_entry(
std::make_shared<DeviceIDComparable>(device_id), meta_index_entry,
end_offset))) {
- } else if (RET_FAIL(load_all_measurement_index_entry(
- meta_index_entry->get_offset(), end_offset, pa,
- meta_index_entry_list))) {
+ } else {
+ int64_t start_offset = meta_index_entry->get_offset();
+ ASSERT(start_offset < end_offset);
+ const int32_t read_size = end_offset - start_offset;
+ int32_t ret_read_len = 0;
+ char* data_buf = (char*)pa.alloc(read_size);
+ void* m_idx_node_buf = pa.alloc(sizeof(MetaIndexNode));
+ if (IS_NULL(data_buf) || IS_NULL(m_idx_node_buf)) {
+ return E_OOM;
+ }
+ auto* top_node_ptr = new (m_idx_node_buf) MetaIndexNode(&pa);
+ top_node = std::shared_ptr<MetaIndexNode>(top_node_ptr,
+ MetaIndexNode::self_deleter);
+ if (RET_FAIL(read_file_->read(start_offset, data_buf, read_size,
+ ret_read_len))) {
+ } else if (RET_FAIL(top_node->deserialize_from(data_buf, read_size))) {
+ } else {
+ is_aligned = is_aligned_device(top_node);
+ if (is_aligned) {
+ if (RET_FAIL(get_time_column_metadata(
+ top_node, time_timeseries_index, pa))) {
+ return ret;
+ }
+ }
+ }
+ }
+ if (RET_FAIL(ret)) {
+ return ret;
+ }
+ if (RET_FAIL(load_all_measurement_index_entry(
+ meta_index_entry->get_offset(), end_offset, pa,
+ meta_index_entry_list))) {
} else if (RET_FAIL(do_load_all_timeseries_index(meta_index_entry_list, pa,
timeseries_indexs))) {
+ } else if (is_aligned && time_timeseries_index != nullptr) {
+ for (size_t i = 0; i < timeseries_indexs.size(); i++) {
+ void* buf = pa.alloc(sizeof(AlignedTimeseriesIndex));
+ if (IS_NULL(buf)) {
+ return E_OOM;
+ }
+ auto* aligned_ts_idx = new (buf) AlignedTimeseriesIndex;
+ aligned_ts_idx->time_ts_idx_ = time_timeseries_index;
+ aligned_ts_idx->value_ts_idx_ =
+ dynamic_cast<TimeseriesIndex*>(timeseries_indexs[i]);
+ if (aligned_ts_idx->value_ts_idx_ == nullptr) {
+ return E_TYPE_NOT_MATCH;
+ }
+ timeseries_indexs[i] = aligned_ts_idx;
+ }
}
return ret;
}
diff --git a/cpp/src/reader/tsfile_reader.cc b/cpp/src/reader/tsfile_reader.cc
index ea7b2cc0..cabf02b0 100644
--- a/cpp/src/reader/tsfile_reader.cc
+++ b/cpp/src/reader/tsfile_reader.cc
@@ -29,7 +29,8 @@ namespace storage {
TsFileReader::TsFileReader()
: read_file_(nullptr),
tsfile_executor_(nullptr),
- table_query_executor_(nullptr) {
+ table_query_executor_(nullptr),
+ table_query_executor_batch_size_(0) {
tsfile_reader_meta_pa_.init(512, MOD_TSFILE_READER);
}
@@ -57,6 +58,7 @@ int TsFileReader::close() {
delete table_query_executor_;
table_query_executor_ = nullptr;
}
+ table_query_executor_batch_size_ = 0;
if (read_file_ != nullptr) {
read_file_->close();
delete read_file_;
@@ -65,6 +67,22 @@ int TsFileReader::close() {
return ret;
}
+int TsFileReader::ensure_table_query_executor(int batch_size) {
+ if (table_query_executor_ != nullptr &&
+ table_query_executor_batch_size_ == batch_size) {
+ return E_OK;
+ }
+
+ if (table_query_executor_ != nullptr) {
+ delete table_query_executor_;
+ table_query_executor_ = nullptr;
+ }
+
+ table_query_executor_ = new TableQueryExecutor(read_file_, batch_size);
+ table_query_executor_batch_size_ = batch_size;
+ return E_OK;
+}
+
int TsFileReader::query(QueryExpression* qe, ResultSet*& ret_qds) {
return tsfile_executor_->execute(qe, ret_qds);
}
@@ -110,9 +128,7 @@ int TsFileReader::query(const std::string& table_name,
}
Filter* time_filter = new TimeBetween(start_time, end_time, false);
- if (table_query_executor_ == nullptr) {
- table_query_executor_ = new TableQueryExecutor(read_file_, batch_size);
- }
+ ensure_table_query_executor(batch_size);
ret = table_query_executor_->query(to_lower(table_name), columns_names,
time_filter, tag_filter, nullptr,
result_set);
@@ -147,9 +163,7 @@ int TsFileReader::queryByRow(const std::string& table_name,
return E_TABLE_NOT_EXIST;
}
- if (table_query_executor_ == nullptr) {
- table_query_executor_ = new TableQueryExecutor(read_file_, batch_size);
- }
+ ensure_table_query_executor(batch_size);
ret = table_query_executor_->query(to_lower(table_name), column_names,
/*time_filter=*/nullptr, tag_filter,
/*field_filter=*/nullptr, offset, limit,
@@ -228,9 +242,7 @@ int TsFileReader::query_table_on_tree(
columns_names[i] = "col_" + std::to_string(i);
}
Filter* time_filter = new TimeBetween(star_time, end_time, false);
- if (table_query_executor_ == nullptr) {
- table_query_executor_ = new TableQueryExecutor(read_file_);
- }
+ ensure_table_query_executor(-1);
ret = table_query_executor_->query_on_tree(
satisfied_device_ids, columns_names, measurement_names_to_query,
time_filter, result_set);
@@ -334,9 +346,16 @@ int TsFileReader::get_timeseries_schema(
device_id, timeseries_indexs, pa))) {
} else {
for (auto timeseries_index : timeseries_indexs) {
+ auto* aligned_timeseries_index =
+ dynamic_cast<AlignedTimeseriesIndex*>(timeseries_index);
+ auto data_type =
+ aligned_timeseries_index != nullptr &&
+ aligned_timeseries_index->value_ts_idx_ != nullptr
+ ? aligned_timeseries_index->value_ts_idx_->get_data_type()
+ : timeseries_index->get_data_type();
MeasurementSchema ms(
timeseries_index->get_measurement_name().to_std_string(),
- timeseries_index->get_data_type());
+ data_type);
result.push_back(ms);
}
}
diff --git a/cpp/src/reader/tsfile_reader.h b/cpp/src/reader/tsfile_reader.h
index e78a38ac..19d83ec6 100644
--- a/cpp/src/reader/tsfile_reader.h
+++ b/cpp/src/reader/tsfile_reader.h
@@ -233,6 +233,7 @@ class TsFileReader {
std::vector<std::shared_ptr<TableSchema>> get_all_table_schemas();
private:
+ int ensure_table_query_executor(int batch_size);
int get_timeseries_metadata_impl(
std::shared_ptr<IDeviceID> device_id,
std::vector<std::shared_ptr<ITimeseriesIndex>>& result);
@@ -242,6 +243,7 @@ class TsFileReader {
storage::ReadFile* read_file_;
storage::TsFileExecutor* tsfile_executor_;
storage::TableQueryExecutor* table_query_executor_;
+ int table_query_executor_batch_size_;
common::PageArena tsfile_reader_meta_pa_;
};
diff --git a/python/tests/test_reader_metadata.py
b/python/tests/test_reader_metadata.py
index 558fcbb1..c8fdf571 100644
--- a/python/tests/test_reader_metadata.py
+++ b/python/tests/test_reader_metadata.py
@@ -18,9 +18,20 @@
import os
import tempfile
+import pandas as pd
import pytest
-from tsfile import Field, RowRecord, TimeseriesSchema, TsFileReader,
TsFileWriter
+from tsfile import (
+ ColumnCategory,
+ ColumnSchema,
+ Field,
+ RowRecord,
+ TableSchema,
+ TimeseriesSchema,
+ TsFileReader,
+ TsFileTableWriter,
+ TsFileWriter,
+)
from tsfile import TSDataType
from tsfile.schema import (
BoolTimeseriesStatistic,
@@ -210,3 +221,54 @@ def test_get_timeseries_metadata_string_statistic():
os.unlink(path)
except OSError:
pass
+
+
+def test_get_timeseries_metadata_table_timeline_statistic_keeps_null_rows():
+ path = os.path.join(tempfile.gettempdir(),
"py_reader_metadata_table_timeline.tsfile")
+ try:
+ os.unlink(path)
+ except OSError:
+ pass
+
+ schema = TableSchema(
+ "weather",
+ [
+ ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG),
+ ColumnSchema("temperature", TSDataType.DOUBLE,
ColumnCategory.FIELD),
+ ColumnSchema("humidity", TSDataType.DOUBLE, ColumnCategory.FIELD),
+ ],
+ )
+ df = pd.DataFrame(
+ {
+ "time": [0, 1],
+ "device": ["device_a", "device_a"],
+ "temperature": [float("nan"), 21.0],
+ "humidity": [50.0, 51.0],
+ }
+ )
+ with TsFileTableWriter(path, schema) as writer:
+ writer.write_dataframe(df)
+
+ reader = TsFileReader(path)
+ try:
+ meta_all = reader.get_timeseries_metadata(None)
+ group = meta_all[next(iter(meta_all))]
+ by_name = {item.measurement_name: item for item in group.timeseries}
+
+ temperature = by_name["temperature"]
+ assert temperature.statistic.row_count == 1
+ assert temperature.statistic.start_time == 1
+ assert temperature.statistic.end_time == 1
+ assert temperature.timeline_statistic.row_count == 2
+ assert temperature.timeline_statistic.start_time == 0
+ assert temperature.timeline_statistic.end_time == 1
+
+ humidity = by_name["humidity"]
+ assert humidity.statistic.row_count == 2
+ assert humidity.timeline_statistic.row_count == 2
+ finally:
+ reader.close()
+ try:
+ os.unlink(path)
+ except OSError:
+ pass
diff --git a/python/tests/test_tsfile_dataset.py
b/python/tests/test_tsfile_dataset.py
index 63cd439a..379bb547 100644
--- a/python/tests/test_tsfile_dataset.py
+++ b/python/tests/test_tsfile_dataset.py
@@ -20,9 +20,11 @@ import numpy as np
import pandas as pd
import pytest
+from tsfile.dataset import dataframe as dataframe_module
from tsfile import ColumnCategory, ColumnSchema, TSDataType, TableSchema,
TsFileTableWriter
from tsfile import AlignedTimeseries, Timeseries, TsFileDataFrame
from tsfile.dataset.formatting import format_timestamp
+from tsfile.dataset.metadata import MetadataCatalog, build_series_path,
resolve_series_path
from tsfile.dataset.reader import TsFileSeriesReader
@@ -47,6 +49,20 @@ def _write_weather_file(path, start):
writer.write_dataframe(df)
+def _write_weather_rows_file(path, rows):
+ schema = TableSchema(
+ "weather",
+ [
+ ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG),
+ ColumnSchema("temperature", TSDataType.DOUBLE,
ColumnCategory.FIELD),
+ ColumnSchema("humidity", TSDataType.DOUBLE, ColumnCategory.FIELD),
+ ],
+ )
+ df = pd.DataFrame(rows)
+ with TsFileTableWriter(str(path), schema) as writer:
+ writer.write_dataframe(df)
+
+
def _write_numeric_and_text_file(path):
schema = TableSchema(
"weather",
@@ -222,10 +238,14 @@ def
test_dataset_exposes_only_numeric_fields_and_keeps_nan(tmp_path):
series = tsdf[0]
assert series.name == "weather.device_a.temperature"
+ assert len(series) == 3
+ assert series.stats == {"start_time": 0, "end_time": 2, "count": 3}
assert np.isnan(series[1])
- sliced = series[:3]
+ np.testing.assert_array_equal(series.timestamps, np.array([0, 1, 2],
dtype=np.int64))
+ sliced = series[:]
assert sliced.shape == (3,)
assert np.isnan(sliced[1])
+ assert sliced[2] == 23.5
assert series[1:1].shape == (0,)
@@ -260,8 +280,45 @@ def
test_dataset_rejects_duplicate_timestamps_across_shards(tmp_path):
_write_weather_file(path1, 0)
_write_weather_file(path2, 2)
- with pytest.raises(ValueError, match="Duplicate timestamp"):
- TsFileDataFrame([str(path1), str(path2)], show_progress=False)
+ with TsFileDataFrame([str(path1), str(path2)], show_progress=False) as
tsdf:
+ series = tsdf["weather.device_a.temperature"]
+ with pytest.raises(ValueError, match="Duplicate timestamp"):
+ _ = series.timestamps
+
+
+def
test_dataset_overlap_position_access_avoids_full_timestamp_materialization(tmp_path,
monkeypatch):
+ path1 = tmp_path / "part1.tsfile"
+ path2 = tmp_path / "part2.tsfile"
+ _write_weather_rows_file(
+ path1,
+ {
+ "time": [0, 2, 4],
+ "device": ["device_a", "device_a", "device_a"],
+ "temperature": [10.0, 30.0, 50.0],
+ "humidity": [100.0, 300.0, 500.0],
+ },
+ )
+ _write_weather_rows_file(
+ path2,
+ {
+ "time": [1, 3, 5],
+ "device": ["device_a", "device_a", "device_a"],
+ "temperature": [20.0, 40.0, 60.0],
+ "humidity": [200.0, 400.0, 600.0],
+ },
+ )
+
+ def fail_merge(*_args, **_kwargs):
+ raise AssertionError("full timestamp merge should not run for overlap
position reads")
+
+ monkeypatch.setattr(dataframe_module, "_merge_field_timestamps",
fail_merge)
+
+ with TsFileDataFrame([str(path1), str(path2)], show_progress=False) as
tsdf:
+ series = tsdf["weather.device_a.temperature"]
+ assert series[0] == 10.0
+ assert series[1] == 20.0
+ assert series[4] == 50.0
+ np.testing.assert_array_equal(series[1:5], np.array([20.0, 30.0, 40.0,
50.0]))
def test_dataset_rejects_data_access_after_close(tmp_path):
@@ -389,9 +446,31 @@ def
test_reader_catalog_shares_device_metadata_and_resolves_paths(tmp_path):
by_ref = reader.get_series_info_by_ref(0, 0)
assert by_ref == by_path
assert by_ref["tag_values"] == {"device": "device_a"}
-
- ts_by_path =
reader.get_series_timestamps("weather.device_a.temperature")
- ts_by_device = reader.get_device_timestamps(0)
- np.testing.assert_array_equal(ts_by_path, ts_by_device)
+ ts_arr, values = reader.read_series_by_ref(0, 0, 100, 102)
+ np.testing.assert_array_equal(ts_arr, np.array([100, 101, 102]))
+ np.testing.assert_array_equal(values, np.array([20.0, 21.5, 23.0]))
finally:
reader.close()
+
+
+def test_series_path_resolution_allows_prefix_tag_values():
+ catalog = MetadataCatalog()
+ table_id = catalog.add_table(
+ "weather",
+ ("site", "device", "region"),
+ (TSDataType.STRING, TSDataType.STRING, TSDataType.STRING),
+ ("temperature",),
+ )
+ device_id = catalog.add_device(table_id, ("site_a", "device_a"), 0, 1)
+ catalog.series_stats_by_ref[(device_id, 0)] = {
+ "length": 1,
+ "min_time": 0,
+ "max_time": 0,
+ "timeline_length": 1,
+ "timeline_min_time": 0,
+ "timeline_max_time": 0,
+ }
+
+ series_path = build_series_path(catalog, device_id, 0)
+ assert series_path == "weather.site_a.device_a.temperature"
+ assert resolve_series_path(catalog, series_path) == (table_id, device_id,
0)
diff --git a/python/tsfile/__init__.py b/python/tsfile/__init__.py
index 31390d9c..e74c5982 100644
--- a/python/tsfile/__init__.py
+++ b/python/tsfile/__init__.py
@@ -20,14 +20,19 @@ import ctypes
import os
import sys
+_pkg_dir = os.path.dirname(os.path.abspath(__file__))
+
if sys.platform == "win32":
- _pkg_dir = os.path.dirname(os.path.abspath(__file__))
os.add_dll_directory(_pkg_dir)
# Preload libtsfile.dll with absolute path to bypass DLL search issues.
# This ensures it's already in memory when .pyd extensions reference it.
_tsfile_dll = os.path.join(_pkg_dir, "libtsfile.dll")
if os.path.isfile(_tsfile_dll):
ctypes.CDLL(_tsfile_dll)
+elif sys.platform == "darwin":
+ _tsfile_dylib = os.path.join(_pkg_dir, "libtsfile.dylib")
+ if os.path.isfile(_tsfile_dylib):
+ ctypes.CDLL(_tsfile_dylib, mode=os.RTLD_GLOBAL)
from .constants import *
from .schema import *
diff --git a/python/tsfile/dataset/dataframe.py
b/python/tsfile/dataset/dataframe.py
index e28a0e5e..baef77c3 100644
--- a/python/tsfile/dataset/dataframe.py
+++ b/python/tsfile/dataset/dataframe.py
@@ -20,6 +20,7 @@
from collections import defaultdict
from dataclasses import dataclass, field
+import heapq
import os
import sys
from typing import Dict, List, Set, Tuple, Union
@@ -40,9 +41,15 @@ DeviceRef = Tuple[object, int]
_QUERY_START = np.iinfo(np.int64).min
_QUERY_END = np.iinfo(np.int64).max
+_DATACLASS_SLOTS = {"slots": True} if sys.version_info >= (3, 10) else {}
+# Overlap position reads use chunked k-way merge. Keep the default chunk small
+# enough to avoid large read amplification for `series[i]` / short slices, but
+# large enough to avoid excessive query_by_row round-trips when overlap spans
+# multiple shards.
+_OVERLAP_ROW_CHUNK_SIZE = 256
-@dataclass(slots=True)
+@dataclass(**_DATACLASS_SLOTS)
class _LogicalIndex:
"""Cross-reader logical mapping for devices and series."""
@@ -64,7 +71,7 @@ class _LogicalIndex:
series_ref_set: Set[SeriesRefKey] = field(default_factory=set)
-@dataclass(slots=True)
+@dataclass(**_DATACLASS_SLOTS)
class _DerivedCache:
"""Merged metadata derived from the logical index."""
@@ -159,25 +166,55 @@ def _register_reader(
def _build_device_entry(refs: List[DeviceRef]) -> dict:
- """Compute per-device time bounds after merging all contributing shards."""
- # [Temporary] It will be replaced by query_by_row and metadata interface
in TsFile
- if len(refs) == 1:
- merged_timestamps = refs[0][0].get_device_timestamps(refs[0][1])
- else:
- merged_timestamps = merge_timestamp_parts(
- [reader.get_device_timestamps(device_id) for reader, device_id in
refs],
- validate_unique=True,
- )
+ """Compute per-device time bounds from cheap metadata only.
+
+ We intentionally do not validate duplicates at the device level because
+ table-model fields do not necessarily share one complete timestamp axis.
+ Duplicate detection stays on the logical-series paths that materialize or
+ merge one field's timestamps.
+ """
+ infos = [reader.get_device_info(device_id) for reader, device_id in refs]
+ min_time = min(info["min_time"] for info in infos)
+ max_time = max(info["max_time"] for info in infos)
+
+ return {
+ "min_time": min_time,
+ "max_time": max_time,
+ }
+
+
+def _build_runtime_series_stats(refs: List[SeriesRef]) -> dict:
+ """Build shared-timeline series stats from native timeline metadata."""
+ min_time = None
+ max_time = None
+ count = 0
+
+ for reader, device_id, field_idx in refs:
+ info = reader.get_series_info_by_ref(device_id, field_idx)
+ shard_min = info["timeline_min_time"]
+ shard_max = info["timeline_max_time"]
+ shard_count = info["timeline_length"]
+
+ if shard_count == 0:
+ continue
+
+ count += shard_count
+ min_time = shard_min if min_time is None else min(min_time, shard_min)
+ max_time = shard_max if max_time is None else max(max_time, shard_max)
return {
- "min_time": int(merged_timestamps[0]) if len(merged_timestamps) > 0
else None,
- "max_time": int(merged_timestamps[-1]) if len(merged_timestamps) > 0
else None,
+ "min_time": min_time,
+ "max_time": max_time,
+ "count": count,
}
def _merge_field_timestamps(series_name: str, refs: List[SeriesRef]) ->
np.ndarray:
"""Load and merge the full timestamp axis for one logical series on
demand."""
- # [Temporary] It will be replaced by query_by_row interface in TsFile
+ # This is intentionally lazy because it is one of the most expensive
dataset
+ # paths: it reads the full timestamp axis for the logical series across all
+ # shards. Today this happens only when callers explicitly ask for
+ # `Timeseries.timestamps`.
time_parts = []
for reader, device_id, field_idx in refs:
ts_arr, _ = reader.read_series_by_ref(device_id, field_idx,
_QUERY_START, _QUERY_END)
@@ -203,17 +240,166 @@ def _merge_field_timestamps(series_name: str, refs:
List[SeriesRef]) -> np.ndarr
return merged_timestamps
+def _read_field_by_position(
+ series_name: str,
+ refs: List[SeriesRef],
+ offset: int,
+ limit: int,
+) -> Tuple[np.ndarray, np.ndarray]:
+ """Read one logical series by global position without materializing
timestamps for non-overlapping shards."""
+ if limit <= 0:
+ return np.array([], dtype=np.int64), np.array([], dtype=np.float64)
+
+ infos = []
+ for reader, device_id, field_idx in refs:
+ series_info = reader.get_series_info_by_ref(device_id, field_idx)
+ infos.append(
+ {
+ "length": series_info["timeline_length"],
+ "min_time": series_info["timeline_min_time"],
+ "max_time": series_info["timeline_max_time"],
+ "table_name": series_info["table_name"],
+ "column_name": series_info["column_name"],
+ "device_id": series_info["device_id"],
+ "field_idx": series_info["field_idx"],
+ "tag_columns": series_info["tag_columns"],
+ "tag_values": series_info["tag_values"],
+ }
+ )
+ ordered = sorted(zip(refs, infos), key=lambda item: (item[1]["min_time"],
item[1]["max_time"]))
+ if _has_time_range_overlap([info for _, info in ordered]):
+ return _read_field_by_position_overlap(series_name, ordered, offset,
limit)
+
+ remaining_offset = offset
+ remaining_limit = limit
+ time_parts = []
+ value_parts = []
+ for (reader, device_id, field_idx), info in ordered:
+ shard_count = info["length"]
+ if remaining_offset >= shard_count:
+ remaining_offset -= shard_count
+ continue
+ local_limit = min(remaining_limit, shard_count - remaining_offset)
+ ts_arr, values = reader.read_series_by_row(device_id, field_idx,
remaining_offset, local_limit)
+ if len(ts_arr) > 0:
+ time_parts.append(ts_arr)
+ value_parts.append(values)
+ remaining_limit -= local_limit
+ remaining_offset = 0
+ if remaining_limit <= 0:
+ break
+
+ if not time_parts:
+ return np.array([], dtype=np.int64), np.array([], dtype=np.float64)
+ return np.concatenate(time_parts), np.concatenate(value_parts)
+
+
+def _has_time_range_overlap(infos: List[dict]) -> bool:
+ previous_max = None
+ for info in infos:
+ if info["min_time"] is None or info["max_time"] is None:
+ continue
+ if previous_max is not None and info["min_time"] <= previous_max:
+ return True
+ previous_max = info["max_time"] if previous_max is None else
max(previous_max, info["max_time"])
+ return False
+
+
+def _read_field_by_position_overlap(
+ series_name: str,
+ ordered: List[Tuple[SeriesRef, dict]],
+ offset: int,
+ limit: int,
+) -> Tuple[np.ndarray, np.ndarray]:
+ """Merge overlapping shard streams lazily until the requested global
window is covered."""
+ total_count = sum(info["length"] for _, info in ordered)
+ if offset >= total_count:
+ return np.array([], dtype=np.int64), np.array([], dtype=np.float64)
+
+ chunk_size = max(_OVERLAP_ROW_CHUNK_SIZE, limit)
+ states = []
+ heap = []
+
+ def fill_state(state_idx: int) -> bool:
+ state = states[state_idx]
+ while state["buffer_index"] >= len(state["timestamps"]):
+ remaining = state["length"] - state["next_offset"]
+ if remaining <= 0:
+ state["exhausted"] = True
+ return False
+
+ local_limit = min(chunk_size, remaining)
+ reader, device_id, field_idx = state["ref"]
+ ts_arr, val_arr = reader.read_series_by_row(device_id, field_idx,
state["next_offset"], local_limit)
+ state["next_offset"] += len(ts_arr)
+ state["timestamps"] = ts_arr
+ state["values"] = val_arr
+ state["buffer_index"] = 0
+ if len(ts_arr) > 0:
+ return True
+
+ state["exhausted"] = True
+ return False
+ return True
+
+ for ref, info in ordered:
+ state_idx = len(states)
+ states.append(
+ {
+ "ref": ref,
+ "length": info["length"],
+ "next_offset": 0,
+ "timestamps": np.array([], dtype=np.int64),
+ "values": np.array([], dtype=np.float64),
+ "buffer_index": 0,
+ "exhausted": False,
+ }
+ )
+ if fill_state(state_idx):
+ heapq.heappush(heap, (int(states[state_idx]["timestamps"][0]),
state_idx))
+
+ skipped = 0
+ output_timestamps = []
+ output_values = []
+ last_timestamp = None
+
+ while heap and len(output_timestamps) < limit:
+ current_ts, state_idx = heapq.heappop(heap)
+ if last_timestamp is not None and current_ts == last_timestamp:
+ raise ValueError(
+ f"Duplicate timestamp {current_ts} found for series
'{series_name}' across shards. "
+ f"Cross-shard duplicate timestamps are not supported."
+ )
+
+ state = states[state_idx]
+ buffer_index = state["buffer_index"]
+ current_value = float(state["values"][buffer_index])
+ state["buffer_index"] += 1
+ if fill_state(state_idx):
+ next_ts = int(state["timestamps"][state["buffer_index"]])
+ heapq.heappush(heap, (next_ts, state_idx))
+
+ last_timestamp = current_ts
+ if skipped < offset:
+ skipped += 1
+ continue
+
+ output_timestamps.append(current_ts)
+ output_values.append(current_value)
+
+ return np.asarray(output_timestamps, dtype=np.int64),
np.asarray(output_values, dtype=np.float64)
+
def _build_field_stats(refs: List[SeriesRef]) -> dict:
- """Aggregate cheap per-shard stats without materializing full series
values."""
+ """Aggregate per-series timeline statistics for dataframe display."""
min_time = None
max_time = None
count = 0
for reader, device_id, field_idx in refs:
info = reader.get_series_info_by_ref(device_id, field_idx)
- shard_min = info["min_time"]
- shard_max = info["max_time"]
- shard_count = info["length"]
+ shard_min = info["timeline_min_time"]
+ shard_max = info["timeline_max_time"]
+ shard_count = info["timeline_length"]
if shard_count == 0:
continue
@@ -437,7 +623,7 @@ class TsFileDataFrame:
table_entry = self._index.table_entries[table_name]
expected_parts = len(table_entry.tag_columns) + 2
- if len(parts) != expected_parts:
+ if len(parts) > expected_parts:
raise KeyError(_series_lookup_hint(series_name))
field_name = parts[-1]
@@ -490,9 +676,12 @@ class TsFileDataFrame:
return Timeseries(
series_name,
self._index.series_ref_map[series_ref],
- self._cache.field_stats[series_ref],
+
_build_runtime_series_stats(self._index.series_ref_map[series_ref]),
self._assert_open,
lambda: _merge_field_timestamps(series_name,
self._index.series_ref_map[series_ref]),
+ lambda offset, limit: _read_field_by_position(
+ series_name, self._index.series_ref_map[series_ref], offset,
limit
+ ),
)
def __getitem__(self, key):
diff --git a/python/tsfile/dataset/metadata.py
b/python/tsfile/dataset/metadata.py
index 5c4611e0..195e3e9f 100644
--- a/python/tsfile/dataset/metadata.py
+++ b/python/tsfile/dataset/metadata.py
@@ -19,18 +19,17 @@
"""Shared metadata models for dataset readers and views."""
from dataclasses import dataclass, field
+import sys
from typing import Any, Dict, Iterable, Iterator, List, Tuple
-import numpy as np
-
from ..constants import TSDataType
_PATH_SEPARATOR = "."
_PATH_ESCAPE = "\\"
+_DATACLASS_SLOTS = {"slots": True} if sys.version_info >= (3, 10) else {}
-
-@dataclass(slots=True)
+@dataclass(**_DATACLASS_SLOTS)
class TableEntry:
"""Schema-level metadata shared by every device in one table."""
@@ -49,7 +48,7 @@ class TableEntry:
return self._field_index_by_name[field_name]
-@dataclass(slots=True)
+@dataclass(**_DATACLASS_SLOTS)
class DeviceEntry:
"""One logical device identified by table_id + ordered tag values.
@@ -58,13 +57,11 @@ class DeviceEntry:
table_id: int
tag_values: Tuple[Any, ...]
- timestamps: np.ndarray
- length: int
min_time: int
max_time: int
-@dataclass(slots=True)
+@dataclass(**_DATACLASS_SLOTS)
class MetadataCatalog:
"""Canonical metadata store shared by dataset readers and dataframes."""
@@ -72,6 +69,7 @@ class MetadataCatalog:
device_entries: List[DeviceEntry] = field(default_factory=list)
table_id_by_name: Dict[str, int] = field(default_factory=dict)
device_id_by_key: Dict[Tuple[int, tuple], int] =
field(default_factory=dict)
+ series_stats_by_ref: Dict[Tuple[int, int], Dict[str, int]] =
field(default_factory=dict)
def add_table(
self,
@@ -92,24 +90,24 @@ class MetadataCatalog:
self.table_id_by_name[table_name] = table_id
return table_id
- def add_device(self, table_id: int, tag_values: tuple, timestamps:
np.ndarray) -> int:
+ def add_device(
+ self,
+ table_id: int,
+ tag_values: tuple,
+ min_time: int,
+ max_time: int,
+ ) -> int:
key = (table_id, tuple(tag_values))
if key in self.device_id_by_key:
return self.device_id_by_key[key]
- timestamps = np.sort(timestamps)
- if len(timestamps) == 0:
- raise ValueError("Cannot register a device without timestamps.")
-
device_id = len(self.device_entries)
self.device_entries.append(
DeviceEntry(
table_id=table_id,
tag_values=tuple(tag_values),
- timestamps=timestamps,
- length=len(timestamps),
- min_time=int(timestamps[0]),
- max_time=int(timestamps[-1]),
+ min_time=min_time,
+ max_time=max_time,
)
)
self.device_id_by_key[key] = device_id
@@ -190,7 +188,7 @@ def resolve_series_path(catalog: MetadataCatalog,
series_path: str) -> Tuple[int
table_id = catalog.table_id_by_name[table_name]
table_entry = catalog.table_entries[table_id]
expected_parts = len(table_entry.tag_columns) + 2
- if len(parts) != expected_parts:
+ if len(parts) > expected_parts:
raise ValueError(f"Series not found: {series_path}")
field_name = parts[-1]
diff --git a/python/tsfile/dataset/reader.py b/python/tsfile/dataset/reader.py
index d7358023..953a22cb 100644
--- a/python/tsfile/dataset/reader.py
+++ b/python/tsfile/dataset/reader.py
@@ -23,9 +23,9 @@ import sys
from typing import Dict, Iterator, List, Tuple
import numpy as np
-import pyarrow.compute as pc
from ..constants import ColumnCategory, TSDataType
+from ..tag_filter import tag_eq
from ..tsfile_reader import TsFileReaderPy
from .metadata import MetadataCatalog, build_series_path, iter_series_refs,
resolve_series_path
@@ -44,6 +44,14 @@ def _to_python_scalar(value):
return value.item() if hasattr(value, "item") else value
+def _build_exact_tag_filter(tag_values: Dict[str, object]):
+ tag_filter = None
+ for tag_column, tag_value in tag_values.items():
+ expr = tag_eq(tag_column, str(tag_value))
+ tag_filter = expr if tag_filter is None else tag_filter & expr
+ return tag_filter
+
+
class TsFileSeriesReader:
"""Wrap ``TsFileReaderPy`` with numeric dataset discovery and batch
reads."""
@@ -103,14 +111,14 @@ class TsFileSeriesReader:
) from e
def _cache_metadata_table_model(self):
- """Build the in-memory catalog by scanning table batches from the
file."""
+ """Build the in-memory catalog from table schemas and native
metadata."""
table_schemas = self._reader.get_all_table_schemas()
if not table_schemas:
raise ValueError("No tables found in TsFile")
self._catalog = MetadataCatalog()
- total_rows = 0
table_names = list(table_schemas.keys())
+ metadata_groups = self._reader.get_timeseries_metadata(None)
for table_index, table_name in enumerate(table_names):
table_schema = table_schemas[table_name]
@@ -138,86 +146,111 @@ class TsFileSeriesReader:
continue
table_id = self._catalog.add_table(table_name, tag_columns,
tag_types, field_columns)
- time_arrays = []
- tag_arrays = {tag_column: [] for tag_column in tag_columns}
-
- # [Temporary] It will be replaced by new tsfile api, we won't
query all the data later.
- query_columns = tag_columns + field_columns
-
- with self._reader.query_table(table_name, query_columns,
batch_size=65536) as result_set:
- while True:
- arrow_table = result_set.read_arrow_batch()
- if arrow_table is None:
- break
- batch_rows = arrow_table.num_rows
- total_rows += batch_rows
- time_arrays.append(arrow_table.column("time").to_numpy())
- for tag_column in tag_columns:
-
tag_arrays[tag_column].append(arrow_table.column(tag_column).to_numpy())
-
- if self.show_progress:
- sys.stderr.write(
- f"\rReading TsFile metadata: table {table_index +
1}/{len(table_names)} "
- f"[{table_name}] ({total_rows:,} rows)"
- )
- sys.stderr.flush()
-
- if not time_arrays:
- continue
-
- timestamps = np.concatenate(time_arrays).astype(np.int64)
- if not tag_columns:
- self._add_device(table_id, (), timestamps)
- continue
-
- for tag_values, device_timestamps in
self._iter_device_groups(tag_columns, timestamps, tag_arrays):
- self._add_device(table_id, tag_values, device_timestamps)
-
- if self.show_progress and total_rows > 0:
+ table_groups = [
+ group
+ for group in metadata_groups.values()
+ if (group.table_name or "").lower() == table_name.lower()
+ ]
+ table_groups.sort(key=lambda group: tuple("" if value is None else
str(value) for value in group.segments))
+
+ for group in table_groups:
+ stats = self._metadata_device_stats(group)
+ if stats is None:
+ continue
+ tag_values = self._metadata_tag_values(group, len(tag_columns))
+ device_id = self._add_device(table_id, tag_values,
stats["min_time"], stats["max_time"])
+
+ stats_by_field = self._metadata_field_stats(group)
+ table_entry = self._catalog.table_entries[table_id]
+ for field_idx, field_name in
enumerate(table_entry.field_columns):
+ field_stats = stats_by_field.get(field_name)
+ if field_stats is None:
+ self._catalog.series_stats_by_ref[(device_id,
field_idx)] = {
+ "length": 0,
+ "min_time": None,
+ "max_time": None,
+ "timeline_length": 0,
+ "timeline_min_time": None,
+ "timeline_max_time": None,
+ }
+ else:
+ self._catalog.series_stats_by_ref[(device_id,
field_idx)] = field_stats
+
+ if self.show_progress:
+ sys.stderr.write(
+ f"\rReading TsFile metadata: table {table_index +
1}/{len(table_names)} "
+ f"[{table_name}]"
+ )
+ sys.stderr.flush()
+
+ if self.show_progress and self.series_count > 0:
sys.stderr.write(
- f"\rReading TsFile metadata: {len(table_names)} table(s),
{total_rows:,} rows, "
- f"{self.series_count} series ... done\n"
+ f"\rReading TsFile metadata: {len(table_names)} table(s),
{self.series_count} series ... done\n"
)
sys.stderr.flush()
if self.series_count == 0:
raise ValueError("No valid numeric series found in TsFile")
- def _iter_device_groups(
- self,
- tag_columns: List[str],
- timestamps: np.ndarray,
- tag_arrays: Dict[str, list],
- ) -> Iterator[Tuple[tuple, np.ndarray]]:
- """Group one table's rows by tag tuple while preserving original row
membership."""
- tag_values_by_column = {column: np.concatenate(tag_arrays[column]) for
column in tag_columns}
-
- n = len(timestamps)
- arrays = [tag_values_by_column[col] for col in tag_columns]
- dtype = np.dtype([(col, arrays[i].dtype) for i, col in
enumerate(tag_columns)])
- composite = np.empty(n, dtype=dtype)
- for i, col in enumerate(tag_columns):
- composite[col] = arrays[i]
-
- _, inverse, counts = np.unique(composite, return_inverse=True,
return_counts=True)
- ordered_indices = np.argsort(inverse, kind="stable")
- group_bounds = np.cumsum(counts)[:-1]
- for group_indices in np.split(ordered_indices, group_bounds):
- first = int(group_indices[0])
- tag_tuple = tuple(_to_python_scalar(composite[col][first]) for col
in tag_columns)
- yield tag_tuple, timestamps[group_indices]
+ @staticmethod
+ def _metadata_device_stats(group) -> dict:
+ """Derive cheap device-level metadata hints from native field
statistics.
+
+ Callers must treat them as pruning/display hints rather than exact
+ logical-series timeline semantics.
+ """
+ statistics = [
+ timeseries.timeline_statistic
+ for timeseries in group.timeseries
+ if timeseries.timeline_statistic.has_statistic and
timeseries.timeline_statistic.row_count > 0
+ ]
+ if not statistics:
+ return None
+
+ return {
+ "min_time": min(int(statistic.start_time) for statistic in
statistics),
+ "max_time": max(int(statistic.end_time) for statistic in
statistics),
+ }
+
+ @staticmethod
+ def _metadata_tag_values(group, tag_count: int) -> tuple:
+ """Extract ordered table tag values from IDeviceID segments.
+
+ A table-model DeviceID may only materialize a prefix of the declared
+ tag columns. Preserve the available prefix rather than requiring a
+ full-length tag tuple here.
+ """
+ if tag_count == 0:
+ return ()
+ return tuple(group.segments[1 : min(len(group.segments), 1 +
tag_count)])
+
+ @staticmethod
+ def _metadata_field_stats(group) -> Dict[str, dict]:
+ stats = {}
+ for timeseries in group.timeseries:
+ statistic = timeseries.statistic
+ timeline_statistic = timeseries.timeline_statistic
+ if not timeline_statistic.has_statistic or
timeline_statistic.row_count <= 0:
+ continue
+ stats[timeseries.measurement_name] = {
+ "length": int(statistic.row_count) if statistic.has_statistic
else 0,
+ "min_time": int(statistic.start_time) if
statistic.has_statistic else None,
+ "max_time": int(statistic.end_time) if statistic.has_statistic
else None,
+ "timeline_length": int(timeline_statistic.row_count),
+ "timeline_min_time": int(timeline_statistic.start_time),
+ "timeline_max_time": int(timeline_statistic.end_time),
+ }
+ return stats
def _add_device(
self,
table_id: int,
tag_values: tuple,
- timestamps: np.ndarray,
+ min_time: int,
+ max_time: int,
):
"""Add one device to the catalog."""
- if len(timestamps) == 0:
- return
-
- self._catalog.add_device(table_id, tag_values, timestamps)
+ return self._catalog.add_device(table_id, tag_values, min_time,
max_time)
def _resolve_series_path(self, series_path: str) -> Tuple[int, int, int]:
return resolve_series_path(self._catalog, series_path)
@@ -236,20 +269,20 @@ class TsFileSeriesReader:
"table_name": table_entry.table_name,
"tag_columns": table_entry.tag_columns,
"tag_values": dict(zip(table_entry.tag_columns,
device_entry.tag_values)),
- "length": device_entry.length,
"min_time": device_entry.min_time,
"max_time": device_entry.max_time,
}
- def get_device_timestamps(self, device_id: int) -> np.ndarray:
- return self._catalog.device_entries[device_id].timestamps
-
def get_series_info_by_ref(self, device_id: int, field_idx: int) -> dict:
table_entry, device_entry, field_name =
self._resolve_series_ref(device_id, field_idx)
+ field_stats = self._catalog.series_stats_by_ref[(device_id, field_idx)]
return {
- "length": device_entry.length,
- "min_time": device_entry.min_time,
- "max_time": device_entry.max_time,
+ "length": field_stats["length"],
+ "min_time": field_stats["min_time"],
+ "max_time": field_stats["max_time"],
+ "timeline_length": field_stats["timeline_length"],
+ "timeline_min_time": field_stats["timeline_min_time"],
+ "timeline_max_time": field_stats["timeline_max_time"],
"table_name": table_entry.table_name,
"column_name": field_name,
"device_id": device_id,
@@ -262,10 +295,6 @@ class TsFileSeriesReader:
device_id, field_idx = self._resolve_series_path(series_path)[1:]
return self.get_series_info_by_ref(device_id, field_idx)
- def get_series_timestamps(self, series_path: str) -> np.ndarray:
- device_id = self._resolve_series_path(series_path)[1]
- return self.get_device_timestamps(device_id)
-
def read_series_by_ref(self, device_id: int, field_idx: int, start_time:
int, end_time: int) -> Tuple[np.ndarray, np.ndarray]:
table_entry, _, field_name = self._resolve_series_ref(device_id,
field_idx)
timestamps, field_values =
self.read_device_fields_by_time_range(device_id, [field_idx], start_time,
end_time)
@@ -277,6 +306,28 @@ class TsFileSeriesReader:
_, device_id, field_idx = self._resolve_series_path(series_path)
return self.read_series_by_ref(device_id, field_idx, start_time,
end_time)
+ def read_series_by_row(self, device_id: int, field_idx: int, offset: int,
limit: int) -> Tuple[np.ndarray, np.ndarray]:
+ """Read one logical series by device-local row offset/limit."""
+ table_entry, device_entry, field_name =
self._resolve_series_ref(device_id, field_idx)
+ tag_values = dict(zip(table_entry.tag_columns,
device_entry.tag_values))
+ tag_filter = _build_exact_tag_filter(tag_values) if tag_values else
None
+
+ timestamps = []
+ values = []
+ with self._reader.query_table_by_row(
+ table_entry.table_name,
+ [field_name],
+ offset=offset,
+ limit=limit,
+ tag_filter=tag_filter,
+ ) as result_set:
+ while result_set.next():
+ timestamps.append(result_set.get_value_by_name("time"))
+ value = result_set.get_value_by_name(field_name)
+ values.append(np.nan if value is None else float(value))
+
+ return np.asarray(timestamps, dtype=np.int64), np.asarray(values,
dtype=np.float64)
+
def read_device_fields_by_time_range(
self, device_id: int, field_indices: List[int], start_time: int,
end_time: int
) -> Tuple[np.ndarray, Dict[str, np.ndarray]]:
@@ -303,18 +354,20 @@ class TsFileSeriesReader:
start_time: int,
end_time: int,
) -> Tuple[np.ndarray, Dict[str, np.ndarray]]:
- """Execute the underlying table query, then apply tag filtering
client-side."""
+ """Execute the underlying table query with exact tag filter
pushdown."""
tag_columns = list(tag_columns)
field_columns = list(field_columns)
- query_columns = tag_columns + field_columns if tag_columns else
list(field_columns)
+ query_columns = list(field_columns)
timestamp_parts = []
field_parts = {field_column: [] for field_column in field_columns}
+ tag_filter = _build_exact_tag_filter(tag_values) if tag_values else
None
with self._reader.query_table(
table_name,
query_columns,
start_time=start_time,
end_time=end_time,
+ tag_filter=tag_filter,
batch_size=65536,
) as result_set:
while True:
@@ -322,13 +375,6 @@ class TsFileSeriesReader:
if arrow_table is None:
break
- if tag_values:
- mask = None
- for tag_column, tag_value in tag_values.items():
- column_mask = pc.equal(arrow_table.column(tag_column),
tag_value)
- mask = column_mask if mask is None else pc.and_(mask,
column_mask)
- arrow_table = arrow_table.filter(mask)
-
if arrow_table.num_rows == 0:
continue
diff --git a/python/tsfile/dataset/timeseries.py
b/python/tsfile/dataset/timeseries.py
index 35f1614d..97a92d99 100644
--- a/python/tsfile/dataset/timeseries.py
+++ b/python/tsfile/dataset/timeseries.py
@@ -70,12 +70,14 @@ class Timeseries:
stats: dict,
ensure_open: Callable[[], None],
load_timestamps: Callable[[], np.ndarray],
+ read_by_position: Callable[[int, int], Tuple[np.ndarray, np.ndarray]],
):
self._name = name
self._series_refs = series_refs
self._stats = dict(stats)
self._ensure_open = ensure_open
self._load_timestamps = load_timestamps
+ self._read_by_position = read_by_position
self._timestamps = None
@property
@@ -101,32 +103,37 @@ class Timeseries:
return self._stats["count"]
def __getitem__(self, key):
- timestamps = self.timestamps
- length = len(timestamps)
+ self._ensure_open()
+ length = len(self)
if isinstance(key, int):
if key < 0:
key += length
if key < 0 or key >= length:
raise IndexError(f"Index {key} out of range [0, {length})")
- ts = int(timestamps[key])
- _, values = self._query_time_range(ts, ts)
+ _, values = self._read_by_position(key, 1)
return float(values[0]) if len(values) > 0 else None
if isinstance(key, slice):
- requested_ts = timestamps[key]
- if len(requested_ts) == 0:
+ start, stop, step = key.indices(length)
+ if (step > 0 and start >= stop) or (step < 0 and start <= stop):
return np.array([], dtype=np.float64)
- ts_arr, values = self._query_time_range(int(np.min(requested_ts)),
int(np.max(requested_ts)))
- result = np.full(len(requested_ts), np.nan)
- if len(ts_arr) > 0:
- indices = np.searchsorted(ts_arr, requested_ts)
- valid = (indices < len(ts_arr)) & (
- ts_arr[np.minimum(indices, len(ts_arr) - 1)] ==
requested_ts
- )
- result[valid] = values[indices[valid]]
- return result
+ # The common case is a forward contiguous slice like [:], [a:b], or
+ # [a:b:1]. Avoid materializing the full position list for large
+ # series; read the window directly.
+ if step == 1:
+ _, values = self._read_by_position(start, stop - start)
+ return values
+
+ positions = np.arange(start, stop, step, dtype=np.int64)
+ min_pos = int(positions.min())
+ max_pos = int(positions.max())
+ _, values = self._read_by_position(min_pos, max_pos - min_pos + 1)
+ if len(values) == 0:
+ return np.array([], dtype=np.float64)
+ relative = positions - min_pos
+ return values[relative]
raise TypeError(f"Unsupported key type: {type(key)}")
@@ -135,8 +142,8 @@ class Timeseries:
time_parts = []
value_parts = []
for reader, device_id, field_idx in self._series_refs:
- device_timestamps = reader.get_device_timestamps(device_id)
- if device_timestamps[-1] < start_time or device_timestamps[0] >
end_time:
+ device_info = reader.get_device_info(device_id)
+ if device_info["max_time"] < start_time or device_info["min_time"]
> end_time:
continue
ts_arr, val_arr = reader.read_series_by_ref(device_id, field_idx,
start_time, end_time)
if len(ts_arr) > 0:
diff --git a/python/tsfile/schema.py b/python/tsfile/schema.py
index ce85f683..88ba1b60 100644
--- a/python/tsfile/schema.py
+++ b/python/tsfile/schema.py
@@ -99,6 +99,7 @@ class TimeseriesMetadata:
data_type: TSDataType
chunk_meta_count: int
statistic: TimeseriesStatisticType
+ timeline_statistic: TimeseriesStatisticType
@dataclass(frozen=True)
diff --git a/python/tsfile/tsfile_cpp.pxd b/python/tsfile/tsfile_cpp.pxd
index 71d60250..0fa570df 100644
--- a/python/tsfile/tsfile_cpp.pxd
+++ b/python/tsfile/tsfile_cpp.pxd
@@ -159,6 +159,7 @@ cdef extern from "cwrapper/tsfile_cwrapper.h":
TSDataType data_type
int32_t chunk_meta_count
TimeseriesStatistic statistic
+ TimeseriesStatistic timeline_statistic
ctypedef struct DeviceID:
char * path
diff --git a/python/tsfile/tsfile_py_cpp.pyx b/python/tsfile/tsfile_py_cpp.pyx
index 67ea9b34..70518b70 100644
--- a/python/tsfile/tsfile_py_cpp.pyx
+++ b/python/tsfile/tsfile_py_cpp.pyx
@@ -1034,11 +1034,13 @@ cdef object
timeseries_metadata_c_to_py(TimeseriesMetadata* m):
else:
name_py = m.measurement_name.decode('utf-8')
cdef object stat = timeseries_statistic_c_to_py(&m.statistic)
+ cdef object timeline_stat =
timeseries_statistic_c_to_py(&m.timeline_statistic)
return TimeseriesMetadataPy(
name_py,
TSDataTypePy(m.data_type),
int(m.chunk_meta_count),
stat,
+ timeline_stat,
)
cdef tuple c_device_segments_to_tuple(char** segs, uint32_t n):
diff --git a/python/tsfile/tsfile_reader.pyx b/python/tsfile/tsfile_reader.pyx
index dbc598f3..9193e2c6 100644
--- a/python/tsfile/tsfile_reader.pyx
+++ b/python/tsfile/tsfile_reader.pyx
@@ -445,6 +445,7 @@ cdef class TsFileReaderPy:
[column_name.lower() for
column_name in column_names],
offset, limit,
c_tag_filter, batch_size)
pyresult = ResultSetPy(self)
+ pyresult._tag_filter_handle = c_tag_filter
pyresult.init_c(result, table_name)
self.activate_result_set_list.add(pyresult)
return pyresult