This is an automated email from the ASF dual-hosted git repository.

colinlee pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/tsfile.git


The following commit(s) were added to refs/heads/develop by this push:
     new 9374f727 Remove timestamps cache to optmize init phase (#769)
9374f727 is described below

commit 9374f72709d6225598ff7517d0106351c35d647b
Author: YangCaiyin <[email protected]>
AuthorDate: Thu Apr 9 17:52:18 2026 +0800

    Remove timestamps cache to optmize init phase (#769)
    
    * remove timestamps cache to optmize init phase
    
    * add timeline statistics
    
    * add timeline statistics
    
    * fix CI
    
    * spotless
    
    * spotless
    
    * fix by review
    
    * change timeline statistics type
---
 cpp/src/cwrapper/tsfile_cwrapper.cc  | 113 ++++++++++++++++-
 cpp/src/cwrapper/tsfile_cwrapper.h   |   1 +
 cpp/src/file/tsfile_io_reader.cc     |  53 +++++++-
 cpp/src/reader/tsfile_reader.cc      |  41 +++++--
 cpp/src/reader/tsfile_reader.h       |   2 +
 python/tests/test_reader_metadata.py |  64 +++++++++-
 python/tests/test_tsfile_dataset.py  |  93 ++++++++++++--
 python/tsfile/__init__.py            |   7 +-
 python/tsfile/dataset/dataframe.py   | 229 ++++++++++++++++++++++++++++++++---
 python/tsfile/dataset/metadata.py    |  34 +++---
 python/tsfile/dataset/reader.py      | 224 ++++++++++++++++++++--------------
 python/tsfile/dataset/timeseries.py  |  41 ++++---
 python/tsfile/schema.py              |   1 +
 python/tsfile/tsfile_cpp.pxd         |   1 +
 python/tsfile/tsfile_py_cpp.pyx      |   2 +
 python/tsfile/tsfile_reader.pyx      |   1 +
 16 files changed, 737 insertions(+), 170 deletions(-)

diff --git a/cpp/src/cwrapper/tsfile_cwrapper.cc 
b/cpp/src/cwrapper/tsfile_cwrapper.cc
index e1b5617a..99db6104 100644
--- a/cpp/src/cwrapper/tsfile_cwrapper.cc
+++ b/cpp/src/cwrapper/tsfile_cwrapper.cc
@@ -970,6 +970,81 @@ int fill_timeseries_statistic(storage::Statistic* st,
     return common::E_OK;
 }
 
+int fill_timeline_statistic(storage::ITimeseriesIndex* idx,
+                            TimeseriesStatistic* out) {
+    clear_timeseries_statistic(out);
+    if (idx == nullptr) {
+        return common::E_OK;
+    }
+
+    auto* aligned_idx = dynamic_cast<storage::AlignedTimeseriesIndex*>(idx);
+    if (aligned_idx != nullptr && aligned_idx->time_ts_idx_ != nullptr &&
+        aligned_idx->time_ts_idx_->get_statistic() != nullptr) {
+        auto* st = aligned_idx->time_ts_idx_->get_statistic();
+        TsFileStatisticBase* b = tsfile_statistic_base(out);
+        b->has_statistic = true;
+        b->type = TS_DATATYPE_VECTOR;
+        b->row_count = st->get_count();
+        b->start_time = st->start_time_;
+        b->end_time = st->get_end_time();
+        return common::E_OK;
+    }
+
+    if (idx->get_statistic() != nullptr &&
+        idx->get_time_chunk_meta_list() == nullptr) {
+        auto* st = idx->get_statistic();
+        TsFileStatisticBase* b = tsfile_statistic_base(out);
+        b->has_statistic = true;
+        b->type = TS_DATATYPE_VECTOR;
+        b->row_count = st->get_count();
+        b->start_time = st->start_time_;
+        b->end_time = st->get_end_time();
+        return common::E_OK;
+    }
+
+    auto* list = idx->get_time_chunk_meta_list();
+    if (list == nullptr) {
+        list = idx->get_chunk_meta_list();
+    }
+    if (list == nullptr) {
+        return common::E_OK;
+    }
+
+    int64_t row_count = 0;
+    int64_t start_time = 0;
+    int64_t end_time = 0;
+    bool has_statistic = false;
+    for (auto it = list->begin(); it != list->end(); it++) {
+        auto* chunk_meta = it.get();
+        if (chunk_meta == nullptr || chunk_meta->statistic_ == nullptr ||
+            chunk_meta->statistic_->count_ <= 0) {
+            continue;
+        }
+        if (!has_statistic) {
+            start_time = chunk_meta->statistic_->start_time_;
+            end_time = chunk_meta->statistic_->end_time_;
+            has_statistic = true;
+        } else {
+            start_time =
+                std::min(start_time, chunk_meta->statistic_->start_time_);
+            end_time = std::max(end_time, chunk_meta->statistic_->end_time_);
+        }
+        row_count += chunk_meta->statistic_->count_;
+    }
+
+    if (!has_statistic) {
+        return common::E_OK;
+    }
+
+    TsFileStatisticBase* b = tsfile_statistic_base(out);
+    b->has_statistic = true;
+    b->type = TS_DATATYPE_VECTOR;
+    b->row_count = row_count;
+    b->start_time = start_time;
+    b->end_time = end_time;
+    return common::E_OK;
+}
+
 void free_device_timeseries_metadata_entries_partial(
     DeviceTimeseriesMetadataEntry* entries, size_t filled_count) {
     if (entries == nullptr) {
@@ -981,6 +1056,8 @@ void free_device_timeseries_metadata_entries_partial(
             for (uint32_t j = 0; j < entries[i].timeseries_count; j++) {
                 free_timeseries_statistic_heap(
                     &entries[i].timeseries[j].statistic);
+                free_timeseries_statistic_heap(
+                    &entries[i].timeseries[j].timeline_statistic);
                 free(entries[i].timeseries[j].measurement_name);
             }
             free(entries[i].timeseries);
@@ -1139,10 +1216,19 @@ ERRNO populate_c_metadata_map_from_cpp(
                 free_device_timeseries_metadata_entries_partial(entries, di);
                 return common::E_OOM;
             }
-            m.data_type = static_cast<TSDataType>(idx->get_data_type());
+            auto* aligned_idx =
+                dynamic_cast<storage::AlignedTimeseriesIndex*>(idx.get());
+            if (aligned_idx != nullptr &&
+                aligned_idx->value_ts_idx_ != nullptr) {
+                m.data_type = static_cast<TSDataType>(
+                    aligned_idx->value_ts_idx_->get_data_type());
+            } else {
+                m.data_type = static_cast<TSDataType>(idx->get_data_type());
+            }
             storage::Statistic* st = idx->get_statistic();
             int32_t chunk_cnt = 0;
-            auto* cl = idx->get_chunk_meta_list();
+            auto* cl = aligned_idx != nullptr ? 
idx->get_value_chunk_meta_list()
+                                              : idx->get_chunk_meta_list();
             if (cl != nullptr) {
                 chunk_cnt = static_cast<int32_t>(cl->size());
             }
@@ -1151,9 +1237,12 @@ ERRNO populate_c_metadata_map_from_cpp(
             if (st_rc != common::E_OK) {
                 for (uint32_t u = 0; u < slot; u++) {
                     free_timeseries_statistic_heap(&e.timeseries[u].statistic);
+                    free_timeseries_statistic_heap(
+                        &e.timeseries[u].timeline_statistic);
                     free(e.timeseries[u].measurement_name);
                 }
                 free_timeseries_statistic_heap(&m.statistic);
+                free_timeseries_statistic_heap(&m.timeline_statistic);
                 free(m.measurement_name);
                 free(e.timeseries);
                 e.timeseries = nullptr;
@@ -1161,6 +1250,24 @@ ERRNO populate_c_metadata_map_from_cpp(
                 free_device_timeseries_metadata_entries_partial(entries, di);
                 return st_rc;
             }
+            const int timeline_st_rc =
+                fill_timeline_statistic(idx.get(), &m.timeline_statistic);
+            if (timeline_st_rc != common::E_OK) {
+                for (uint32_t u = 0; u < slot; u++) {
+                    free_timeseries_statistic_heap(&e.timeseries[u].statistic);
+                    free_timeseries_statistic_heap(
+                        &e.timeseries[u].timeline_statistic);
+                    free(e.timeseries[u].measurement_name);
+                }
+                free_timeseries_statistic_heap(&m.statistic);
+                free_timeseries_statistic_heap(&m.timeline_statistic);
+                free(m.measurement_name);
+                free(e.timeseries);
+                e.timeseries = nullptr;
+                clear_metadata_entry_device_only(&e);
+                free_device_timeseries_metadata_entries_partial(entries, di);
+                return timeline_st_rc;
+            }
             slot++;
         }
         di++;
@@ -1591,4 +1698,4 @@ ResultSet tsfile_query_table_with_tag_filter(
 
 #ifdef __cplusplus
 }
-#endif
\ No newline at end of file
+#endif
diff --git a/cpp/src/cwrapper/tsfile_cwrapper.h 
b/cpp/src/cwrapper/tsfile_cwrapper.h
index aa7fd029..ae3e28ee 100644
--- a/cpp/src/cwrapper/tsfile_cwrapper.h
+++ b/cpp/src/cwrapper/tsfile_cwrapper.h
@@ -193,6 +193,7 @@ typedef struct TimeseriesMetadata {
     TSDataType data_type;
     int32_t chunk_meta_count;
     TimeseriesStatistic statistic;
+    TimeseriesStatistic timeline_statistic;
 } TimeseriesMetadata;
 
 /**
diff --git a/cpp/src/file/tsfile_io_reader.cc b/cpp/src/file/tsfile_io_reader.cc
index 31d4a303..e96008a4 100644
--- a/cpp/src/file/tsfile_io_reader.cc
+++ b/cpp/src/file/tsfile_io_reader.cc
@@ -96,14 +96,61 @@ int 
TsFileIOReader::get_device_timeseries_meta_without_chunk_meta(
     int64_t end_offset;
     std::vector<std::pair<std::shared_ptr<IMetaIndexEntry>, int64_t>>
         meta_index_entry_list;
+    std::shared_ptr<MetaIndexNode> top_node;
+    bool is_aligned = false;
+    TimeseriesIndex* time_timeseries_index = nullptr;
     if (RET_FAIL(load_device_index_entry(
             std::make_shared<DeviceIDComparable>(device_id), meta_index_entry,
             end_offset))) {
-    } else if (RET_FAIL(load_all_measurement_index_entry(
-                   meta_index_entry->get_offset(), end_offset, pa,
-                   meta_index_entry_list))) {
+    } else {
+        int64_t start_offset = meta_index_entry->get_offset();
+        ASSERT(start_offset < end_offset);
+        const int32_t read_size = end_offset - start_offset;
+        int32_t ret_read_len = 0;
+        char* data_buf = (char*)pa.alloc(read_size);
+        void* m_idx_node_buf = pa.alloc(sizeof(MetaIndexNode));
+        if (IS_NULL(data_buf) || IS_NULL(m_idx_node_buf)) {
+            return E_OOM;
+        }
+        auto* top_node_ptr = new (m_idx_node_buf) MetaIndexNode(&pa);
+        top_node = std::shared_ptr<MetaIndexNode>(top_node_ptr,
+                                                  MetaIndexNode::self_deleter);
+        if (RET_FAIL(read_file_->read(start_offset, data_buf, read_size,
+                                      ret_read_len))) {
+        } else if (RET_FAIL(top_node->deserialize_from(data_buf, read_size))) {
+        } else {
+            is_aligned = is_aligned_device(top_node);
+            if (is_aligned) {
+                if (RET_FAIL(get_time_column_metadata(
+                        top_node, time_timeseries_index, pa))) {
+                    return ret;
+                }
+            }
+        }
+    }
+    if (RET_FAIL(ret)) {
+        return ret;
+    }
+    if (RET_FAIL(load_all_measurement_index_entry(
+            meta_index_entry->get_offset(), end_offset, pa,
+            meta_index_entry_list))) {
     } else if (RET_FAIL(do_load_all_timeseries_index(meta_index_entry_list, pa,
                                                      timeseries_indexs))) {
+    } else if (is_aligned && time_timeseries_index != nullptr) {
+        for (size_t i = 0; i < timeseries_indexs.size(); i++) {
+            void* buf = pa.alloc(sizeof(AlignedTimeseriesIndex));
+            if (IS_NULL(buf)) {
+                return E_OOM;
+            }
+            auto* aligned_ts_idx = new (buf) AlignedTimeseriesIndex;
+            aligned_ts_idx->time_ts_idx_ = time_timeseries_index;
+            aligned_ts_idx->value_ts_idx_ =
+                dynamic_cast<TimeseriesIndex*>(timeseries_indexs[i]);
+            if (aligned_ts_idx->value_ts_idx_ == nullptr) {
+                return E_TYPE_NOT_MATCH;
+            }
+            timeseries_indexs[i] = aligned_ts_idx;
+        }
     }
     return ret;
 }
diff --git a/cpp/src/reader/tsfile_reader.cc b/cpp/src/reader/tsfile_reader.cc
index ea7b2cc0..cabf02b0 100644
--- a/cpp/src/reader/tsfile_reader.cc
+++ b/cpp/src/reader/tsfile_reader.cc
@@ -29,7 +29,8 @@ namespace storage {
 TsFileReader::TsFileReader()
     : read_file_(nullptr),
       tsfile_executor_(nullptr),
-      table_query_executor_(nullptr) {
+      table_query_executor_(nullptr),
+      table_query_executor_batch_size_(0) {
     tsfile_reader_meta_pa_.init(512, MOD_TSFILE_READER);
 }
 
@@ -57,6 +58,7 @@ int TsFileReader::close() {
         delete table_query_executor_;
         table_query_executor_ = nullptr;
     }
+    table_query_executor_batch_size_ = 0;
     if (read_file_ != nullptr) {
         read_file_->close();
         delete read_file_;
@@ -65,6 +67,22 @@ int TsFileReader::close() {
     return ret;
 }
 
+int TsFileReader::ensure_table_query_executor(int batch_size) {
+    if (table_query_executor_ != nullptr &&
+        table_query_executor_batch_size_ == batch_size) {
+        return E_OK;
+    }
+
+    if (table_query_executor_ != nullptr) {
+        delete table_query_executor_;
+        table_query_executor_ = nullptr;
+    }
+
+    table_query_executor_ = new TableQueryExecutor(read_file_, batch_size);
+    table_query_executor_batch_size_ = batch_size;
+    return E_OK;
+}
+
 int TsFileReader::query(QueryExpression* qe, ResultSet*& ret_qds) {
     return tsfile_executor_->execute(qe, ret_qds);
 }
@@ -110,9 +128,7 @@ int TsFileReader::query(const std::string& table_name,
     }
 
     Filter* time_filter = new TimeBetween(start_time, end_time, false);
-    if (table_query_executor_ == nullptr) {
-        table_query_executor_ = new TableQueryExecutor(read_file_, batch_size);
-    }
+    ensure_table_query_executor(batch_size);
     ret = table_query_executor_->query(to_lower(table_name), columns_names,
                                        time_filter, tag_filter, nullptr,
                                        result_set);
@@ -147,9 +163,7 @@ int TsFileReader::queryByRow(const std::string& table_name,
         return E_TABLE_NOT_EXIST;
     }
 
-    if (table_query_executor_ == nullptr) {
-        table_query_executor_ = new TableQueryExecutor(read_file_, batch_size);
-    }
+    ensure_table_query_executor(batch_size);
     ret = table_query_executor_->query(to_lower(table_name), column_names,
                                        /*time_filter=*/nullptr, tag_filter,
                                        /*field_filter=*/nullptr, offset, limit,
@@ -228,9 +242,7 @@ int TsFileReader::query_table_on_tree(
         columns_names[i] = "col_" + std::to_string(i);
     }
     Filter* time_filter = new TimeBetween(star_time, end_time, false);
-    if (table_query_executor_ == nullptr) {
-        table_query_executor_ = new TableQueryExecutor(read_file_);
-    }
+    ensure_table_query_executor(-1);
     ret = table_query_executor_->query_on_tree(
         satisfied_device_ids, columns_names, measurement_names_to_query,
         time_filter, result_set);
@@ -334,9 +346,16 @@ int TsFileReader::get_timeseries_schema(
                          device_id, timeseries_indexs, pa))) {
     } else {
         for (auto timeseries_index : timeseries_indexs) {
+            auto* aligned_timeseries_index =
+                dynamic_cast<AlignedTimeseriesIndex*>(timeseries_index);
+            auto data_type =
+                aligned_timeseries_index != nullptr &&
+                        aligned_timeseries_index->value_ts_idx_ != nullptr
+                    ? aligned_timeseries_index->value_ts_idx_->get_data_type()
+                    : timeseries_index->get_data_type();
             MeasurementSchema ms(
                 timeseries_index->get_measurement_name().to_std_string(),
-                timeseries_index->get_data_type());
+                data_type);
             result.push_back(ms);
         }
     }
diff --git a/cpp/src/reader/tsfile_reader.h b/cpp/src/reader/tsfile_reader.h
index e78a38ac..19d83ec6 100644
--- a/cpp/src/reader/tsfile_reader.h
+++ b/cpp/src/reader/tsfile_reader.h
@@ -233,6 +233,7 @@ class TsFileReader {
     std::vector<std::shared_ptr<TableSchema>> get_all_table_schemas();
 
    private:
+    int ensure_table_query_executor(int batch_size);
     int get_timeseries_metadata_impl(
         std::shared_ptr<IDeviceID> device_id,
         std::vector<std::shared_ptr<ITimeseriesIndex>>& result);
@@ -242,6 +243,7 @@ class TsFileReader {
     storage::ReadFile* read_file_;
     storage::TsFileExecutor* tsfile_executor_;
     storage::TableQueryExecutor* table_query_executor_;
+    int table_query_executor_batch_size_;
     common::PageArena tsfile_reader_meta_pa_;
 };
 
diff --git a/python/tests/test_reader_metadata.py 
b/python/tests/test_reader_metadata.py
index 558fcbb1..c8fdf571 100644
--- a/python/tests/test_reader_metadata.py
+++ b/python/tests/test_reader_metadata.py
@@ -18,9 +18,20 @@
 import os
 import tempfile
 
+import pandas as pd
 import pytest
 
-from tsfile import Field, RowRecord, TimeseriesSchema, TsFileReader, 
TsFileWriter
+from tsfile import (
+    ColumnCategory,
+    ColumnSchema,
+    Field,
+    RowRecord,
+    TableSchema,
+    TimeseriesSchema,
+    TsFileReader,
+    TsFileTableWriter,
+    TsFileWriter,
+)
 from tsfile import TSDataType
 from tsfile.schema import (
     BoolTimeseriesStatistic,
@@ -210,3 +221,54 @@ def test_get_timeseries_metadata_string_statistic():
             os.unlink(path)
         except OSError:
             pass
+
+
+def test_get_timeseries_metadata_table_timeline_statistic_keeps_null_rows():
+    path = os.path.join(tempfile.gettempdir(), 
"py_reader_metadata_table_timeline.tsfile")
+    try:
+        os.unlink(path)
+    except OSError:
+        pass
+
+    schema = TableSchema(
+        "weather",
+        [
+            ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG),
+            ColumnSchema("temperature", TSDataType.DOUBLE, 
ColumnCategory.FIELD),
+            ColumnSchema("humidity", TSDataType.DOUBLE, ColumnCategory.FIELD),
+        ],
+    )
+    df = pd.DataFrame(
+        {
+            "time": [0, 1],
+            "device": ["device_a", "device_a"],
+            "temperature": [float("nan"), 21.0],
+            "humidity": [50.0, 51.0],
+        }
+    )
+    with TsFileTableWriter(path, schema) as writer:
+        writer.write_dataframe(df)
+
+    reader = TsFileReader(path)
+    try:
+        meta_all = reader.get_timeseries_metadata(None)
+        group = meta_all[next(iter(meta_all))]
+        by_name = {item.measurement_name: item for item in group.timeseries}
+
+        temperature = by_name["temperature"]
+        assert temperature.statistic.row_count == 1
+        assert temperature.statistic.start_time == 1
+        assert temperature.statistic.end_time == 1
+        assert temperature.timeline_statistic.row_count == 2
+        assert temperature.timeline_statistic.start_time == 0
+        assert temperature.timeline_statistic.end_time == 1
+
+        humidity = by_name["humidity"]
+        assert humidity.statistic.row_count == 2
+        assert humidity.timeline_statistic.row_count == 2
+    finally:
+        reader.close()
+        try:
+            os.unlink(path)
+        except OSError:
+            pass
diff --git a/python/tests/test_tsfile_dataset.py 
b/python/tests/test_tsfile_dataset.py
index 63cd439a..379bb547 100644
--- a/python/tests/test_tsfile_dataset.py
+++ b/python/tests/test_tsfile_dataset.py
@@ -20,9 +20,11 @@ import numpy as np
 import pandas as pd
 import pytest
 
+from tsfile.dataset import dataframe as dataframe_module
 from tsfile import ColumnCategory, ColumnSchema, TSDataType, TableSchema, 
TsFileTableWriter
 from tsfile import AlignedTimeseries, Timeseries, TsFileDataFrame
 from tsfile.dataset.formatting import format_timestamp
+from tsfile.dataset.metadata import MetadataCatalog, build_series_path, 
resolve_series_path
 from tsfile.dataset.reader import TsFileSeriesReader
 
 
@@ -47,6 +49,20 @@ def _write_weather_file(path, start):
         writer.write_dataframe(df)
 
 
+def _write_weather_rows_file(path, rows):
+    schema = TableSchema(
+        "weather",
+        [
+            ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG),
+            ColumnSchema("temperature", TSDataType.DOUBLE, 
ColumnCategory.FIELD),
+            ColumnSchema("humidity", TSDataType.DOUBLE, ColumnCategory.FIELD),
+        ],
+    )
+    df = pd.DataFrame(rows)
+    with TsFileTableWriter(str(path), schema) as writer:
+        writer.write_dataframe(df)
+
+
 def _write_numeric_and_text_file(path):
     schema = TableSchema(
         "weather",
@@ -222,10 +238,14 @@ def 
test_dataset_exposes_only_numeric_fields_and_keeps_nan(tmp_path):
 
         series = tsdf[0]
         assert series.name == "weather.device_a.temperature"
+        assert len(series) == 3
+        assert series.stats == {"start_time": 0, "end_time": 2, "count": 3}
         assert np.isnan(series[1])
-        sliced = series[:3]
+        np.testing.assert_array_equal(series.timestamps, np.array([0, 1, 2], 
dtype=np.int64))
+        sliced = series[:]
         assert sliced.shape == (3,)
         assert np.isnan(sliced[1])
+        assert sliced[2] == 23.5
         assert series[1:1].shape == (0,)
 
 
@@ -260,8 +280,45 @@ def 
test_dataset_rejects_duplicate_timestamps_across_shards(tmp_path):
     _write_weather_file(path1, 0)
     _write_weather_file(path2, 2)
 
-    with pytest.raises(ValueError, match="Duplicate timestamp"):
-        TsFileDataFrame([str(path1), str(path2)], show_progress=False)
+    with TsFileDataFrame([str(path1), str(path2)], show_progress=False) as 
tsdf:
+        series = tsdf["weather.device_a.temperature"]
+        with pytest.raises(ValueError, match="Duplicate timestamp"):
+            _ = series.timestamps
+
+
+def 
test_dataset_overlap_position_access_avoids_full_timestamp_materialization(tmp_path,
 monkeypatch):
+    path1 = tmp_path / "part1.tsfile"
+    path2 = tmp_path / "part2.tsfile"
+    _write_weather_rows_file(
+        path1,
+        {
+            "time": [0, 2, 4],
+            "device": ["device_a", "device_a", "device_a"],
+            "temperature": [10.0, 30.0, 50.0],
+            "humidity": [100.0, 300.0, 500.0],
+        },
+    )
+    _write_weather_rows_file(
+        path2,
+        {
+            "time": [1, 3, 5],
+            "device": ["device_a", "device_a", "device_a"],
+            "temperature": [20.0, 40.0, 60.0],
+            "humidity": [200.0, 400.0, 600.0],
+        },
+    )
+
+    def fail_merge(*_args, **_kwargs):
+        raise AssertionError("full timestamp merge should not run for overlap 
position reads")
+
+    monkeypatch.setattr(dataframe_module, "_merge_field_timestamps", 
fail_merge)
+
+    with TsFileDataFrame([str(path1), str(path2)], show_progress=False) as 
tsdf:
+        series = tsdf["weather.device_a.temperature"]
+        assert series[0] == 10.0
+        assert series[1] == 20.0
+        assert series[4] == 50.0
+        np.testing.assert_array_equal(series[1:5], np.array([20.0, 30.0, 40.0, 
50.0]))
 
 
 def test_dataset_rejects_data_access_after_close(tmp_path):
@@ -389,9 +446,31 @@ def 
test_reader_catalog_shares_device_metadata_and_resolves_paths(tmp_path):
         by_ref = reader.get_series_info_by_ref(0, 0)
         assert by_ref == by_path
         assert by_ref["tag_values"] == {"device": "device_a"}
-
-        ts_by_path = 
reader.get_series_timestamps("weather.device_a.temperature")
-        ts_by_device = reader.get_device_timestamps(0)
-        np.testing.assert_array_equal(ts_by_path, ts_by_device)
+        ts_arr, values = reader.read_series_by_ref(0, 0, 100, 102)
+        np.testing.assert_array_equal(ts_arr, np.array([100, 101, 102]))
+        np.testing.assert_array_equal(values, np.array([20.0, 21.5, 23.0]))
     finally:
         reader.close()
+
+
+def test_series_path_resolution_allows_prefix_tag_values():
+    catalog = MetadataCatalog()
+    table_id = catalog.add_table(
+        "weather",
+        ("site", "device", "region"),
+        (TSDataType.STRING, TSDataType.STRING, TSDataType.STRING),
+        ("temperature",),
+    )
+    device_id = catalog.add_device(table_id, ("site_a", "device_a"), 0, 1)
+    catalog.series_stats_by_ref[(device_id, 0)] = {
+        "length": 1,
+        "min_time": 0,
+        "max_time": 0,
+        "timeline_length": 1,
+        "timeline_min_time": 0,
+        "timeline_max_time": 0,
+    }
+
+    series_path = build_series_path(catalog, device_id, 0)
+    assert series_path == "weather.site_a.device_a.temperature"
+    assert resolve_series_path(catalog, series_path) == (table_id, device_id, 
0)
diff --git a/python/tsfile/__init__.py b/python/tsfile/__init__.py
index 31390d9c..e74c5982 100644
--- a/python/tsfile/__init__.py
+++ b/python/tsfile/__init__.py
@@ -20,14 +20,19 @@ import ctypes
 import os
 import sys
 
+_pkg_dir = os.path.dirname(os.path.abspath(__file__))
+
 if sys.platform == "win32":
-    _pkg_dir = os.path.dirname(os.path.abspath(__file__))
     os.add_dll_directory(_pkg_dir)
     # Preload libtsfile.dll with absolute path to bypass DLL search issues.
     # This ensures it's already in memory when .pyd extensions reference it.
     _tsfile_dll = os.path.join(_pkg_dir, "libtsfile.dll")
     if os.path.isfile(_tsfile_dll):
         ctypes.CDLL(_tsfile_dll)
+elif sys.platform == "darwin":
+    _tsfile_dylib = os.path.join(_pkg_dir, "libtsfile.dylib")
+    if os.path.isfile(_tsfile_dylib):
+        ctypes.CDLL(_tsfile_dylib, mode=os.RTLD_GLOBAL)
 
 from .constants import *
 from .schema import *
diff --git a/python/tsfile/dataset/dataframe.py 
b/python/tsfile/dataset/dataframe.py
index e28a0e5e..baef77c3 100644
--- a/python/tsfile/dataset/dataframe.py
+++ b/python/tsfile/dataset/dataframe.py
@@ -20,6 +20,7 @@
 
 from collections import defaultdict
 from dataclasses import dataclass, field
+import heapq
 import os
 import sys
 from typing import Dict, List, Set, Tuple, Union
@@ -40,9 +41,15 @@ DeviceRef = Tuple[object, int]
 
 _QUERY_START = np.iinfo(np.int64).min
 _QUERY_END = np.iinfo(np.int64).max
+_DATACLASS_SLOTS = {"slots": True} if sys.version_info >= (3, 10) else {}
+# Overlap position reads use chunked k-way merge. Keep the default chunk small
+# enough to avoid large read amplification for `series[i]` / short slices, but
+# large enough to avoid excessive query_by_row round-trips when overlap spans
+# multiple shards.
+_OVERLAP_ROW_CHUNK_SIZE = 256
 
 
-@dataclass(slots=True)
+@dataclass(**_DATACLASS_SLOTS)
 class _LogicalIndex:
     """Cross-reader logical mapping for devices and series."""
 
@@ -64,7 +71,7 @@ class _LogicalIndex:
     series_ref_set: Set[SeriesRefKey] = field(default_factory=set)
 
 
-@dataclass(slots=True)
+@dataclass(**_DATACLASS_SLOTS)
 class _DerivedCache:
     """Merged metadata derived from the logical index."""
 
@@ -159,25 +166,55 @@ def _register_reader(
 
 
 def _build_device_entry(refs: List[DeviceRef]) -> dict:
-    """Compute per-device time bounds after merging all contributing shards."""
-    # [Temporary] It will be replaced by query_by_row and metadata interface 
in TsFile
-    if len(refs) == 1:
-        merged_timestamps = refs[0][0].get_device_timestamps(refs[0][1])
-    else:
-        merged_timestamps = merge_timestamp_parts(
-            [reader.get_device_timestamps(device_id) for reader, device_id in 
refs],
-            validate_unique=True,
-        )
+    """Compute per-device time bounds from cheap metadata only.
+
+    We intentionally do not validate duplicates at the device level because
+    table-model fields do not necessarily share one complete timestamp axis.
+    Duplicate detection stays on the logical-series paths that materialize or
+    merge one field's timestamps.
+    """
+    infos = [reader.get_device_info(device_id) for reader, device_id in refs]
+    min_time = min(info["min_time"] for info in infos)
+    max_time = max(info["max_time"] for info in infos)
+
+    return {
+        "min_time": min_time,
+        "max_time": max_time,
+    }
+
+
+def _build_runtime_series_stats(refs: List[SeriesRef]) -> dict:
+    """Build shared-timeline series stats from native timeline metadata."""
+    min_time = None
+    max_time = None
+    count = 0
+
+    for reader, device_id, field_idx in refs:
+        info = reader.get_series_info_by_ref(device_id, field_idx)
+        shard_min = info["timeline_min_time"]
+        shard_max = info["timeline_max_time"]
+        shard_count = info["timeline_length"]
+
+        if shard_count == 0:
+            continue
+
+        count += shard_count
+        min_time = shard_min if min_time is None else min(min_time, shard_min)
+        max_time = shard_max if max_time is None else max(max_time, shard_max)
 
     return {
-        "min_time": int(merged_timestamps[0]) if len(merged_timestamps) > 0 
else None,
-        "max_time": int(merged_timestamps[-1]) if len(merged_timestamps) > 0 
else None,
+        "min_time": min_time,
+        "max_time": max_time,
+        "count": count,
     }
 
 
 def _merge_field_timestamps(series_name: str, refs: List[SeriesRef]) -> 
np.ndarray:
     """Load and merge the full timestamp axis for one logical series on 
demand."""
-    # [Temporary] It will be replaced by query_by_row interface in TsFile
+    # This is intentionally lazy because it is one of the most expensive 
dataset
+    # paths: it reads the full timestamp axis for the logical series across all
+    # shards. Today this happens only when callers explicitly ask for
+    # `Timeseries.timestamps`.
     time_parts = []
     for reader, device_id, field_idx in refs:
         ts_arr, _ = reader.read_series_by_ref(device_id, field_idx, 
_QUERY_START, _QUERY_END)
@@ -203,17 +240,166 @@ def _merge_field_timestamps(series_name: str, refs: 
List[SeriesRef]) -> np.ndarr
     return merged_timestamps
 
 
+def _read_field_by_position(
+    series_name: str,
+    refs: List[SeriesRef],
+    offset: int,
+    limit: int,
+) -> Tuple[np.ndarray, np.ndarray]:
+    """Read one logical series by global position without materializing 
timestamps for non-overlapping shards."""
+    if limit <= 0:
+        return np.array([], dtype=np.int64), np.array([], dtype=np.float64)
+
+    infos = []
+    for reader, device_id, field_idx in refs:
+        series_info = reader.get_series_info_by_ref(device_id, field_idx)
+        infos.append(
+            {
+                "length": series_info["timeline_length"],
+                "min_time": series_info["timeline_min_time"],
+                "max_time": series_info["timeline_max_time"],
+                "table_name": series_info["table_name"],
+                "column_name": series_info["column_name"],
+                "device_id": series_info["device_id"],
+                "field_idx": series_info["field_idx"],
+                "tag_columns": series_info["tag_columns"],
+                "tag_values": series_info["tag_values"],
+            }
+        )
+    ordered = sorted(zip(refs, infos), key=lambda item: (item[1]["min_time"], 
item[1]["max_time"]))
+    if _has_time_range_overlap([info for _, info in ordered]):
+        return _read_field_by_position_overlap(series_name, ordered, offset, 
limit)
+
+    remaining_offset = offset
+    remaining_limit = limit
+    time_parts = []
+    value_parts = []
+    for (reader, device_id, field_idx), info in ordered:
+        shard_count = info["length"]
+        if remaining_offset >= shard_count:
+            remaining_offset -= shard_count
+            continue
+        local_limit = min(remaining_limit, shard_count - remaining_offset)
+        ts_arr, values = reader.read_series_by_row(device_id, field_idx, 
remaining_offset, local_limit)
+        if len(ts_arr) > 0:
+            time_parts.append(ts_arr)
+            value_parts.append(values)
+        remaining_limit -= local_limit
+        remaining_offset = 0
+        if remaining_limit <= 0:
+            break
+
+    if not time_parts:
+        return np.array([], dtype=np.int64), np.array([], dtype=np.float64)
+    return np.concatenate(time_parts), np.concatenate(value_parts)
+
+
+def _has_time_range_overlap(infos: List[dict]) -> bool:
+    previous_max = None
+    for info in infos:
+        if info["min_time"] is None or info["max_time"] is None:
+            continue
+        if previous_max is not None and info["min_time"] <= previous_max:
+            return True
+        previous_max = info["max_time"] if previous_max is None else 
max(previous_max, info["max_time"])
+    return False
+
+
+def _read_field_by_position_overlap(
+    series_name: str,
+    ordered: List[Tuple[SeriesRef, dict]],
+    offset: int,
+    limit: int,
+) -> Tuple[np.ndarray, np.ndarray]:
+    """Merge overlapping shard streams lazily until the requested global 
window is covered."""
+    total_count = sum(info["length"] for _, info in ordered)
+    if offset >= total_count:
+        return np.array([], dtype=np.int64), np.array([], dtype=np.float64)
+
+    chunk_size = max(_OVERLAP_ROW_CHUNK_SIZE, limit)
+    states = []
+    heap = []
+
+    def fill_state(state_idx: int) -> bool:
+        state = states[state_idx]
+        while state["buffer_index"] >= len(state["timestamps"]):
+            remaining = state["length"] - state["next_offset"]
+            if remaining <= 0:
+                state["exhausted"] = True
+                return False
+
+            local_limit = min(chunk_size, remaining)
+            reader, device_id, field_idx = state["ref"]
+            ts_arr, val_arr = reader.read_series_by_row(device_id, field_idx, 
state["next_offset"], local_limit)
+            state["next_offset"] += len(ts_arr)
+            state["timestamps"] = ts_arr
+            state["values"] = val_arr
+            state["buffer_index"] = 0
+            if len(ts_arr) > 0:
+                return True
+
+            state["exhausted"] = True
+            return False
+        return True
+
+    for ref, info in ordered:
+        state_idx = len(states)
+        states.append(
+            {
+                "ref": ref,
+                "length": info["length"],
+                "next_offset": 0,
+                "timestamps": np.array([], dtype=np.int64),
+                "values": np.array([], dtype=np.float64),
+                "buffer_index": 0,
+                "exhausted": False,
+            }
+        )
+        if fill_state(state_idx):
+            heapq.heappush(heap, (int(states[state_idx]["timestamps"][0]), 
state_idx))
+
+    skipped = 0
+    output_timestamps = []
+    output_values = []
+    last_timestamp = None
+
+    while heap and len(output_timestamps) < limit:
+        current_ts, state_idx = heapq.heappop(heap)
+        if last_timestamp is not None and current_ts == last_timestamp:
+            raise ValueError(
+                f"Duplicate timestamp {current_ts} found for series 
'{series_name}' across shards. "
+                f"Cross-shard duplicate timestamps are not supported."
+            )
+
+        state = states[state_idx]
+        buffer_index = state["buffer_index"]
+        current_value = float(state["values"][buffer_index])
+        state["buffer_index"] += 1
+        if fill_state(state_idx):
+            next_ts = int(state["timestamps"][state["buffer_index"]])
+            heapq.heappush(heap, (next_ts, state_idx))
+
+        last_timestamp = current_ts
+        if skipped < offset:
+            skipped += 1
+            continue
+
+        output_timestamps.append(current_ts)
+        output_values.append(current_value)
+
+    return np.asarray(output_timestamps, dtype=np.int64), 
np.asarray(output_values, dtype=np.float64)
+
 def _build_field_stats(refs: List[SeriesRef]) -> dict:
-    """Aggregate cheap per-shard stats without materializing full series 
values."""
+    """Aggregate per-series timeline statistics for dataframe display."""
     min_time = None
     max_time = None
     count = 0
 
     for reader, device_id, field_idx in refs:
         info = reader.get_series_info_by_ref(device_id, field_idx)
-        shard_min = info["min_time"]
-        shard_max = info["max_time"]
-        shard_count = info["length"]
+        shard_min = info["timeline_min_time"]
+        shard_max = info["timeline_max_time"]
+        shard_count = info["timeline_length"]
 
         if shard_count == 0:
             continue
@@ -437,7 +623,7 @@ class TsFileDataFrame:
 
         table_entry = self._index.table_entries[table_name]
         expected_parts = len(table_entry.tag_columns) + 2
-        if len(parts) != expected_parts:
+        if len(parts) > expected_parts:
             raise KeyError(_series_lookup_hint(series_name))
 
         field_name = parts[-1]
@@ -490,9 +676,12 @@ class TsFileDataFrame:
         return Timeseries(
             series_name,
             self._index.series_ref_map[series_ref],
-            self._cache.field_stats[series_ref],
+            
_build_runtime_series_stats(self._index.series_ref_map[series_ref]),
             self._assert_open,
             lambda: _merge_field_timestamps(series_name, 
self._index.series_ref_map[series_ref]),
+            lambda offset, limit: _read_field_by_position(
+                series_name, self._index.series_ref_map[series_ref], offset, 
limit
+            ),
         )
 
     def __getitem__(self, key):
diff --git a/python/tsfile/dataset/metadata.py 
b/python/tsfile/dataset/metadata.py
index 5c4611e0..195e3e9f 100644
--- a/python/tsfile/dataset/metadata.py
+++ b/python/tsfile/dataset/metadata.py
@@ -19,18 +19,17 @@
 """Shared metadata models for dataset readers and views."""
 
 from dataclasses import dataclass, field
+import sys
 from typing import Any, Dict, Iterable, Iterator, List, Tuple
 
-import numpy as np
-
 from ..constants import TSDataType
 
 
 _PATH_SEPARATOR = "."
 _PATH_ESCAPE = "\\"
+_DATACLASS_SLOTS = {"slots": True} if sys.version_info >= (3, 10) else {}
 
-
-@dataclass(slots=True)
+@dataclass(**_DATACLASS_SLOTS)
 class TableEntry:
     """Schema-level metadata shared by every device in one table."""
 
@@ -49,7 +48,7 @@ class TableEntry:
         return self._field_index_by_name[field_name]
 
 
-@dataclass(slots=True)
+@dataclass(**_DATACLASS_SLOTS)
 class DeviceEntry:
     """One logical device identified by table_id + ordered tag values.
 
@@ -58,13 +57,11 @@ class DeviceEntry:
 
     table_id: int
     tag_values: Tuple[Any, ...]
-    timestamps: np.ndarray
-    length: int
     min_time: int
     max_time: int
 
 
-@dataclass(slots=True)
+@dataclass(**_DATACLASS_SLOTS)
 class MetadataCatalog:
     """Canonical metadata store shared by dataset readers and dataframes."""
 
@@ -72,6 +69,7 @@ class MetadataCatalog:
     device_entries: List[DeviceEntry] = field(default_factory=list)
     table_id_by_name: Dict[str, int] = field(default_factory=dict)
     device_id_by_key: Dict[Tuple[int, tuple], int] = 
field(default_factory=dict)
+    series_stats_by_ref: Dict[Tuple[int, int], Dict[str, int]] = 
field(default_factory=dict)
 
     def add_table(
         self,
@@ -92,24 +90,24 @@ class MetadataCatalog:
         self.table_id_by_name[table_name] = table_id
         return table_id
 
-    def add_device(self, table_id: int, tag_values: tuple, timestamps: 
np.ndarray) -> int:
+    def add_device(
+        self,
+        table_id: int,
+        tag_values: tuple,
+        min_time: int,
+        max_time: int,
+    ) -> int:
         key = (table_id, tuple(tag_values))
         if key in self.device_id_by_key:
             return self.device_id_by_key[key]
 
-        timestamps = np.sort(timestamps)
-        if len(timestamps) == 0:
-            raise ValueError("Cannot register a device without timestamps.")
-
         device_id = len(self.device_entries)
         self.device_entries.append(
             DeviceEntry(
                 table_id=table_id,
                 tag_values=tuple(tag_values),
-                timestamps=timestamps,
-                length=len(timestamps),
-                min_time=int(timestamps[0]),
-                max_time=int(timestamps[-1]),
+                min_time=min_time,
+                max_time=max_time,
             )
         )
         self.device_id_by_key[key] = device_id
@@ -190,7 +188,7 @@ def resolve_series_path(catalog: MetadataCatalog, 
series_path: str) -> Tuple[int
     table_id = catalog.table_id_by_name[table_name]
     table_entry = catalog.table_entries[table_id]
     expected_parts = len(table_entry.tag_columns) + 2
-    if len(parts) != expected_parts:
+    if len(parts) > expected_parts:
         raise ValueError(f"Series not found: {series_path}")
 
     field_name = parts[-1]
diff --git a/python/tsfile/dataset/reader.py b/python/tsfile/dataset/reader.py
index d7358023..953a22cb 100644
--- a/python/tsfile/dataset/reader.py
+++ b/python/tsfile/dataset/reader.py
@@ -23,9 +23,9 @@ import sys
 from typing import Dict, Iterator, List, Tuple
 
 import numpy as np
-import pyarrow.compute as pc
 
 from ..constants import ColumnCategory, TSDataType
+from ..tag_filter import tag_eq
 from ..tsfile_reader import TsFileReaderPy
 from .metadata import MetadataCatalog, build_series_path, iter_series_refs, 
resolve_series_path
 
@@ -44,6 +44,14 @@ def _to_python_scalar(value):
     return value.item() if hasattr(value, "item") else value
 
 
+def _build_exact_tag_filter(tag_values: Dict[str, object]):
+    tag_filter = None
+    for tag_column, tag_value in tag_values.items():
+        expr = tag_eq(tag_column, str(tag_value))
+        tag_filter = expr if tag_filter is None else tag_filter & expr
+    return tag_filter
+
+
 class TsFileSeriesReader:
     """Wrap ``TsFileReaderPy`` with numeric dataset discovery and batch 
reads."""
 
@@ -103,14 +111,14 @@ class TsFileSeriesReader:
             ) from e
 
     def _cache_metadata_table_model(self):
-        """Build the in-memory catalog by scanning table batches from the 
file."""
+        """Build the in-memory catalog from table schemas and native 
metadata."""
         table_schemas = self._reader.get_all_table_schemas()
         if not table_schemas:
             raise ValueError("No tables found in TsFile")
 
         self._catalog = MetadataCatalog()
-        total_rows = 0
         table_names = list(table_schemas.keys())
+        metadata_groups = self._reader.get_timeseries_metadata(None)
 
         for table_index, table_name in enumerate(table_names):
             table_schema = table_schemas[table_name]
@@ -138,86 +146,111 @@ class TsFileSeriesReader:
                 continue
 
             table_id = self._catalog.add_table(table_name, tag_columns, 
tag_types, field_columns)
-            time_arrays = []
-            tag_arrays = {tag_column: [] for tag_column in tag_columns}
-
-            # [Temporary] It will be replaced by new tsfile api, we won't 
query all the data later.
-            query_columns = tag_columns + field_columns
-
-            with self._reader.query_table(table_name, query_columns, 
batch_size=65536) as result_set:
-                while True:
-                    arrow_table = result_set.read_arrow_batch()
-                    if arrow_table is None:
-                        break
-                    batch_rows = arrow_table.num_rows
-                    total_rows += batch_rows
-                    time_arrays.append(arrow_table.column("time").to_numpy())
-                    for tag_column in tag_columns:
-                        
tag_arrays[tag_column].append(arrow_table.column(tag_column).to_numpy())
-
-                    if self.show_progress:
-                        sys.stderr.write(
-                            f"\rReading TsFile metadata: table {table_index + 
1}/{len(table_names)} "
-                            f"[{table_name}] ({total_rows:,} rows)"
-                        )
-                        sys.stderr.flush()
-
-            if not time_arrays:
-                continue
-
-            timestamps = np.concatenate(time_arrays).astype(np.int64)
-            if not tag_columns:
-                self._add_device(table_id, (), timestamps)
-                continue
-
-            for tag_values, device_timestamps in 
self._iter_device_groups(tag_columns, timestamps, tag_arrays):
-                self._add_device(table_id, tag_values, device_timestamps)
-
-        if self.show_progress and total_rows > 0:
+            table_groups = [
+                group
+                for group in metadata_groups.values()
+                if (group.table_name or "").lower() == table_name.lower()
+            ]
+            table_groups.sort(key=lambda group: tuple("" if value is None else 
str(value) for value in group.segments))
+
+            for group in table_groups:
+                stats = self._metadata_device_stats(group)
+                if stats is None:
+                    continue
+                tag_values = self._metadata_tag_values(group, len(tag_columns))
+                device_id = self._add_device(table_id, tag_values, 
stats["min_time"], stats["max_time"])
+
+                stats_by_field = self._metadata_field_stats(group)
+                table_entry = self._catalog.table_entries[table_id]
+                for field_idx, field_name in 
enumerate(table_entry.field_columns):
+                    field_stats = stats_by_field.get(field_name)
+                    if field_stats is None:
+                        self._catalog.series_stats_by_ref[(device_id, 
field_idx)] = {
+                            "length": 0,
+                            "min_time": None,
+                            "max_time": None,
+                            "timeline_length": 0,
+                            "timeline_min_time": None,
+                            "timeline_max_time": None,
+                        }
+                    else:
+                        self._catalog.series_stats_by_ref[(device_id, 
field_idx)] = field_stats
+
+            if self.show_progress:
+                sys.stderr.write(
+                    f"\rReading TsFile metadata: table {table_index + 
1}/{len(table_names)} "
+                    f"[{table_name}]"
+                )
+                sys.stderr.flush()
+
+        if self.show_progress and self.series_count > 0:
             sys.stderr.write(
-                f"\rReading TsFile metadata: {len(table_names)} table(s), 
{total_rows:,} rows, "
-                f"{self.series_count} series ... done\n"
+                f"\rReading TsFile metadata: {len(table_names)} table(s), 
{self.series_count} series ... done\n"
             )
             sys.stderr.flush()
 
         if self.series_count == 0:
             raise ValueError("No valid numeric series found in TsFile")
 
-    def _iter_device_groups(
-        self,
-        tag_columns: List[str],
-        timestamps: np.ndarray,
-        tag_arrays: Dict[str, list],
-    ) -> Iterator[Tuple[tuple, np.ndarray]]:
-        """Group one table's rows by tag tuple while preserving original row 
membership."""
-        tag_values_by_column = {column: np.concatenate(tag_arrays[column]) for 
column in tag_columns}
-
-        n = len(timestamps)
-        arrays = [tag_values_by_column[col] for col in tag_columns]
-        dtype = np.dtype([(col, arrays[i].dtype) for i, col in 
enumerate(tag_columns)])
-        composite = np.empty(n, dtype=dtype)
-        for i, col in enumerate(tag_columns):
-            composite[col] = arrays[i]
-
-        _, inverse, counts = np.unique(composite, return_inverse=True, 
return_counts=True)
-        ordered_indices = np.argsort(inverse, kind="stable")
-        group_bounds = np.cumsum(counts)[:-1]
-        for group_indices in np.split(ordered_indices, group_bounds):
-            first = int(group_indices[0])
-            tag_tuple = tuple(_to_python_scalar(composite[col][first]) for col 
in tag_columns)
-            yield tag_tuple, timestamps[group_indices]
+    @staticmethod
+    def _metadata_device_stats(group) -> dict:
+        """Derive cheap device-level metadata hints from native field 
statistics.
+
+        Callers must treat them as pruning/display hints rather than exact
+        logical-series timeline semantics.
+        """
+        statistics = [
+            timeseries.timeline_statistic
+            for timeseries in group.timeseries
+            if timeseries.timeline_statistic.has_statistic and 
timeseries.timeline_statistic.row_count > 0
+        ]
+        if not statistics:
+            return None
+
+        return {
+            "min_time": min(int(statistic.start_time) for statistic in 
statistics),
+            "max_time": max(int(statistic.end_time) for statistic in 
statistics),
+        }
+
+    @staticmethod
+    def _metadata_tag_values(group, tag_count: int) -> tuple:
+        """Extract ordered table tag values from IDeviceID segments.
+
+        A table-model DeviceID may only materialize a prefix of the declared
+        tag columns. Preserve the available prefix rather than requiring a
+        full-length tag tuple here.
+        """
+        if tag_count == 0:
+            return ()
+        return tuple(group.segments[1 : min(len(group.segments), 1 + 
tag_count)])
+
+    @staticmethod
+    def _metadata_field_stats(group) -> Dict[str, dict]:
+        stats = {}
+        for timeseries in group.timeseries:
+            statistic = timeseries.statistic
+            timeline_statistic = timeseries.timeline_statistic
+            if not timeline_statistic.has_statistic or 
timeline_statistic.row_count <= 0:
+                continue
+            stats[timeseries.measurement_name] = {
+                "length": int(statistic.row_count) if statistic.has_statistic 
else 0,
+                "min_time": int(statistic.start_time) if 
statistic.has_statistic else None,
+                "max_time": int(statistic.end_time) if statistic.has_statistic 
else None,
+                "timeline_length": int(timeline_statistic.row_count),
+                "timeline_min_time": int(timeline_statistic.start_time),
+                "timeline_max_time": int(timeline_statistic.end_time),
+            }
+        return stats
 
     def _add_device(
         self,
         table_id: int,
         tag_values: tuple,
-        timestamps: np.ndarray,
+        min_time: int,
+        max_time: int,
     ):
         """Add one device to the catalog."""
-        if len(timestamps) == 0:
-            return
-
-        self._catalog.add_device(table_id, tag_values, timestamps)
+        return self._catalog.add_device(table_id, tag_values, min_time, 
max_time)
 
     def _resolve_series_path(self, series_path: str) -> Tuple[int, int, int]:
         return resolve_series_path(self._catalog, series_path)
@@ -236,20 +269,20 @@ class TsFileSeriesReader:
             "table_name": table_entry.table_name,
             "tag_columns": table_entry.tag_columns,
             "tag_values": dict(zip(table_entry.tag_columns, 
device_entry.tag_values)),
-            "length": device_entry.length,
             "min_time": device_entry.min_time,
             "max_time": device_entry.max_time,
         }
 
-    def get_device_timestamps(self, device_id: int) -> np.ndarray:
-        return self._catalog.device_entries[device_id].timestamps
-
     def get_series_info_by_ref(self, device_id: int, field_idx: int) -> dict:
         table_entry, device_entry, field_name = 
self._resolve_series_ref(device_id, field_idx)
+        field_stats = self._catalog.series_stats_by_ref[(device_id, field_idx)]
         return {
-            "length": device_entry.length,
-            "min_time": device_entry.min_time,
-            "max_time": device_entry.max_time,
+            "length": field_stats["length"],
+            "min_time": field_stats["min_time"],
+            "max_time": field_stats["max_time"],
+            "timeline_length": field_stats["timeline_length"],
+            "timeline_min_time": field_stats["timeline_min_time"],
+            "timeline_max_time": field_stats["timeline_max_time"],
             "table_name": table_entry.table_name,
             "column_name": field_name,
             "device_id": device_id,
@@ -262,10 +295,6 @@ class TsFileSeriesReader:
         device_id, field_idx = self._resolve_series_path(series_path)[1:]
         return self.get_series_info_by_ref(device_id, field_idx)
 
-    def get_series_timestamps(self, series_path: str) -> np.ndarray:
-        device_id = self._resolve_series_path(series_path)[1]
-        return self.get_device_timestamps(device_id)
-
     def read_series_by_ref(self, device_id: int, field_idx: int, start_time: 
int, end_time: int) -> Tuple[np.ndarray, np.ndarray]:
         table_entry, _, field_name = self._resolve_series_ref(device_id, 
field_idx)
         timestamps, field_values = 
self.read_device_fields_by_time_range(device_id, [field_idx], start_time, 
end_time)
@@ -277,6 +306,28 @@ class TsFileSeriesReader:
         _, device_id, field_idx = self._resolve_series_path(series_path)
         return self.read_series_by_ref(device_id, field_idx, start_time, 
end_time)
 
+    def read_series_by_row(self, device_id: int, field_idx: int, offset: int, 
limit: int) -> Tuple[np.ndarray, np.ndarray]:
+        """Read one logical series by device-local row offset/limit."""
+        table_entry, device_entry, field_name = 
self._resolve_series_ref(device_id, field_idx)
+        tag_values = dict(zip(table_entry.tag_columns, 
device_entry.tag_values))
+        tag_filter = _build_exact_tag_filter(tag_values) if tag_values else 
None
+
+        timestamps = []
+        values = []
+        with self._reader.query_table_by_row(
+            table_entry.table_name,
+            [field_name],
+            offset=offset,
+            limit=limit,
+            tag_filter=tag_filter,
+        ) as result_set:
+            while result_set.next():
+                timestamps.append(result_set.get_value_by_name("time"))
+                value = result_set.get_value_by_name(field_name)
+                values.append(np.nan if value is None else float(value))
+
+        return np.asarray(timestamps, dtype=np.int64), np.asarray(values, 
dtype=np.float64)
+
     def read_device_fields_by_time_range(
         self, device_id: int, field_indices: List[int], start_time: int, 
end_time: int
     ) -> Tuple[np.ndarray, Dict[str, np.ndarray]]:
@@ -303,18 +354,20 @@ class TsFileSeriesReader:
         start_time: int,
         end_time: int,
     ) -> Tuple[np.ndarray, Dict[str, np.ndarray]]:
-        """Execute the underlying table query, then apply tag filtering 
client-side."""
+        """Execute the underlying table query with exact tag filter 
pushdown."""
         tag_columns = list(tag_columns)
         field_columns = list(field_columns)
-        query_columns = tag_columns + field_columns if tag_columns else 
list(field_columns)
+        query_columns = list(field_columns)
         timestamp_parts = []
         field_parts = {field_column: [] for field_column in field_columns}
+        tag_filter = _build_exact_tag_filter(tag_values) if tag_values else 
None
 
         with self._reader.query_table(
             table_name,
             query_columns,
             start_time=start_time,
             end_time=end_time,
+            tag_filter=tag_filter,
             batch_size=65536,
         ) as result_set:
             while True:
@@ -322,13 +375,6 @@ class TsFileSeriesReader:
                 if arrow_table is None:
                     break
 
-                if tag_values:
-                    mask = None
-                    for tag_column, tag_value in tag_values.items():
-                        column_mask = pc.equal(arrow_table.column(tag_column), 
tag_value)
-                        mask = column_mask if mask is None else pc.and_(mask, 
column_mask)
-                    arrow_table = arrow_table.filter(mask)
-
                 if arrow_table.num_rows == 0:
                     continue
 
diff --git a/python/tsfile/dataset/timeseries.py 
b/python/tsfile/dataset/timeseries.py
index 35f1614d..97a92d99 100644
--- a/python/tsfile/dataset/timeseries.py
+++ b/python/tsfile/dataset/timeseries.py
@@ -70,12 +70,14 @@ class Timeseries:
         stats: dict,
         ensure_open: Callable[[], None],
         load_timestamps: Callable[[], np.ndarray],
+        read_by_position: Callable[[int, int], Tuple[np.ndarray, np.ndarray]],
     ):
         self._name = name
         self._series_refs = series_refs
         self._stats = dict(stats)
         self._ensure_open = ensure_open
         self._load_timestamps = load_timestamps
+        self._read_by_position = read_by_position
         self._timestamps = None
 
     @property
@@ -101,32 +103,37 @@ class Timeseries:
         return self._stats["count"]
 
     def __getitem__(self, key):
-        timestamps = self.timestamps
-        length = len(timestamps)
+        self._ensure_open()
+        length = len(self)
 
         if isinstance(key, int):
             if key < 0:
                 key += length
             if key < 0 or key >= length:
                 raise IndexError(f"Index {key} out of range [0, {length})")
-            ts = int(timestamps[key])
-            _, values = self._query_time_range(ts, ts)
+            _, values = self._read_by_position(key, 1)
             return float(values[0]) if len(values) > 0 else None
 
         if isinstance(key, slice):
-            requested_ts = timestamps[key]
-            if len(requested_ts) == 0:
+            start, stop, step = key.indices(length)
+            if (step > 0 and start >= stop) or (step < 0 and start <= stop):
                 return np.array([], dtype=np.float64)
 
-            ts_arr, values = self._query_time_range(int(np.min(requested_ts)), 
int(np.max(requested_ts)))
-            result = np.full(len(requested_ts), np.nan)
-            if len(ts_arr) > 0:
-                indices = np.searchsorted(ts_arr, requested_ts)
-                valid = (indices < len(ts_arr)) & (
-                    ts_arr[np.minimum(indices, len(ts_arr) - 1)] == 
requested_ts
-                )
-                result[valid] = values[indices[valid]]
-            return result
+            # The common case is a forward contiguous slice like [:], [a:b], or
+            # [a:b:1]. Avoid materializing the full position list for large
+            # series; read the window directly.
+            if step == 1:
+                _, values = self._read_by_position(start, stop - start)
+                return values
+
+            positions = np.arange(start, stop, step, dtype=np.int64)
+            min_pos = int(positions.min())
+            max_pos = int(positions.max())
+            _, values = self._read_by_position(min_pos, max_pos - min_pos + 1)
+            if len(values) == 0:
+                return np.array([], dtype=np.float64)
+            relative = positions - min_pos
+            return values[relative]
 
         raise TypeError(f"Unsupported key type: {type(key)}")
 
@@ -135,8 +142,8 @@ class Timeseries:
         time_parts = []
         value_parts = []
         for reader, device_id, field_idx in self._series_refs:
-            device_timestamps = reader.get_device_timestamps(device_id)
-            if device_timestamps[-1] < start_time or device_timestamps[0] > 
end_time:
+            device_info = reader.get_device_info(device_id)
+            if device_info["max_time"] < start_time or device_info["min_time"] 
> end_time:
                 continue
             ts_arr, val_arr = reader.read_series_by_ref(device_id, field_idx, 
start_time, end_time)
             if len(ts_arr) > 0:
diff --git a/python/tsfile/schema.py b/python/tsfile/schema.py
index ce85f683..88ba1b60 100644
--- a/python/tsfile/schema.py
+++ b/python/tsfile/schema.py
@@ -99,6 +99,7 @@ class TimeseriesMetadata:
     data_type: TSDataType
     chunk_meta_count: int
     statistic: TimeseriesStatisticType
+    timeline_statistic: TimeseriesStatisticType
 
 
 @dataclass(frozen=True)
diff --git a/python/tsfile/tsfile_cpp.pxd b/python/tsfile/tsfile_cpp.pxd
index 71d60250..0fa570df 100644
--- a/python/tsfile/tsfile_cpp.pxd
+++ b/python/tsfile/tsfile_cpp.pxd
@@ -159,6 +159,7 @@ cdef extern from "cwrapper/tsfile_cwrapper.h":
         TSDataType data_type
         int32_t chunk_meta_count
         TimeseriesStatistic statistic
+        TimeseriesStatistic timeline_statistic
 
     ctypedef struct DeviceID:
         char * path
diff --git a/python/tsfile/tsfile_py_cpp.pyx b/python/tsfile/tsfile_py_cpp.pyx
index 67ea9b34..70518b70 100644
--- a/python/tsfile/tsfile_py_cpp.pyx
+++ b/python/tsfile/tsfile_py_cpp.pyx
@@ -1034,11 +1034,13 @@ cdef object 
timeseries_metadata_c_to_py(TimeseriesMetadata* m):
     else:
         name_py = m.measurement_name.decode('utf-8')
     cdef object stat = timeseries_statistic_c_to_py(&m.statistic)
+    cdef object timeline_stat = 
timeseries_statistic_c_to_py(&m.timeline_statistic)
     return TimeseriesMetadataPy(
         name_py,
         TSDataTypePy(m.data_type),
         int(m.chunk_meta_count),
         stat,
+        timeline_stat,
     )
 
 cdef tuple c_device_segments_to_tuple(char** segs, uint32_t n):
diff --git a/python/tsfile/tsfile_reader.pyx b/python/tsfile/tsfile_reader.pyx
index dbc598f3..9193e2c6 100644
--- a/python/tsfile/tsfile_reader.pyx
+++ b/python/tsfile/tsfile_reader.pyx
@@ -445,6 +445,7 @@ cdef class TsFileReaderPy:
                                                       [column_name.lower() for 
column_name in column_names],
                                                       offset, limit, 
c_tag_filter, batch_size)
         pyresult = ResultSetPy(self)
+        pyresult._tag_filter_handle = c_tag_filter
         pyresult.init_c(result, table_name)
         self.activate_result_set_list.add(pyresult)
         return pyresult

Reply via email to