This is an automated email from the ASF dual-hosted git repository.

colinlee pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/tsfile.git


The following commit(s) were added to refs/heads/develop by this push:
     new 89a31913 Fix py read error (#433)
89a31913 is described below

commit 89a3191340605e1e33eabafdfacb09d9a9ee4478
Author: Colin Lee <[email protected]>
AuthorDate: Mon Mar 10 15:44:09 2025 +0800

    Fix py read error (#433)
    
    1. fix data losing when query a table that not all devices have data.
    2. add support for py read table schema.
    3. add support for py read dataframe.
---
 cpp/src/common/tsblock/tsblock.h                   | 14 ++----
 .../reader/block/device_ordered_tsblock_reader.cc  | 18 +++++--
 .../reader/block/single_device_tsblock_reader.cc   | 27 ++++++++--
 .../reader/block/single_device_tsblock_reader.h    |  6 ++-
 cpp/src/reader/table_query_executor.h              |  2 +-
 cpp/src/reader/table_result_set.cc                 | 14 ++++--
 .../writer/table_view/tsfile_writer_table_test.cc  | 21 +++++---
 cpp/test/writer/tsfile_writer_test.cc              |  8 ++-
 python/tests/test_write_and_read.py                | 48 +++++++++++++-----
 python/tsfile/schema.py                            | 17 ++++++-
 python/tsfile/tsfile_cpp.pxd                       | 27 ++++++----
 python/tsfile/tsfile_py_cpp.pxd                    |  6 ++-
 python/tsfile/tsfile_py_cpp.pyx                    | 38 ++++++++++++++-
 python/tsfile/tsfile_reader.pyx                    | 57 +++++++++++++++++-----
 14 files changed, 228 insertions(+), 75 deletions(-)

diff --git a/cpp/src/common/tsblock/tsblock.h b/cpp/src/common/tsblock/tsblock.h
index 9bc865b0..47f110cc 100644
--- a/cpp/src/common/tsblock/tsblock.h
+++ b/cpp/src/common/tsblock/tsblock.h
@@ -50,10 +50,11 @@ class TsBlock {
           max_row_count_(max_row_count),
           tuple_desc_(tupledesc) {}
 
-    virtual ~TsBlock() {
+    ~TsBlock() {
         int size = vectors_.size();
         for (int i = 0; i < size; ++i) {
             delete vectors_[i];
+            vectors_[i] = nullptr;
         }
     }
 
@@ -227,16 +228,7 @@ class RowIterator {
         column_count_ = tsblock_->tuple_desc_->get_column_count();
     }
 
-    ~RowIterator() {
-        /*
-         * if use RowIterator and ColIterator at the same time,
-         * need to reset the offset after one is used,
-         * otherwise it will cause the offset to be wrong
-         */
-        for (uint32_t i = 0; i < column_count_; ++i) {
-            tsblock_->vectors_[i]->reset_offset();
-        }
-    }
+    ~RowIterator() {}
 
     FORCE_INLINE bool end() { return row_id_ >= tsblock_->row_count_; }
 
diff --git a/cpp/src/reader/block/device_ordered_tsblock_reader.cc 
b/cpp/src/reader/block/device_ordered_tsblock_reader.cc
index 191ed607..7469cc8b 100644
--- a/cpp/src/reader/block/device_ordered_tsblock_reader.cc
+++ b/cpp/src/reader/block/device_ordered_tsblock_reader.cc
@@ -33,7 +33,7 @@ int DeviceOrderedTsBlockReader::has_next(bool &has_next) {
     while (device_task_iterator_->has_next()) {
         DeviceQueryTask *task = nullptr;
         if (IS_FAIL(device_task_iterator_->next(task))) {
-            has_next = false;
+            return ret;
         }
         if (current_reader_) {
             delete current_reader_;
@@ -43,15 +43,25 @@ int DeviceOrderedTsBlockReader::has_next(bool &has_next) {
             task, block_size_, metadata_querier_, tsfile_io_reader_, 
time_filter_,
             field_filter_);
         if (current_reader_ == nullptr) {
-            has_next = false;
             return common::E_OOM;
         }
+
         if (RET_FAIL(current_reader_->has_next(has_next))) {
             return ret;
-        } else {
-            return common::E_OK;
+        }
+        // If current device has data, just return.
+        if (has_next) {
+            return ret;
+        }
+        // If current device does not have data, get next device.
+
+        // Free current device reader.
+        if (current_reader_) {
+            delete current_reader_;
+            current_reader_ = nullptr;
         }
     }
+    has_next = false;
     return ret;
 }
 
diff --git a/cpp/src/reader/block/single_device_tsblock_reader.cc 
b/cpp/src/reader/block/single_device_tsblock_reader.cc
index 99c8e3ed..ec8059f9 100644
--- a/cpp/src/reader/block/single_device_tsblock_reader.cc
+++ b/cpp/src/reader/block/single_device_tsblock_reader.cc
@@ -74,6 +74,13 @@ int SingleDeviceTsBlockReader::init(DeviceQueryTask* 
device_query_task,
         construct_column_context(time_series_index, time_filter);
     }
 
+    // There is no data in this single device tsblock reader.
+    if (field_column_contexts_.empty()) {
+        delete current_block_;
+        current_block_ = nullptr;
+        return common::E_OK;
+    }
+
     for (const auto& id_column :
          device_query_task->get_column_mapping()->get_id_columns()) {
         const auto& column_pos_in_result =
@@ -233,6 +240,10 @@ void SingleDeviceTsBlockReader::close() {
     if (device_query_task_) {
         device_query_task_->~DeviceQueryTask();
     }
+    if (current_block_) {
+        delete current_block_;
+        current_block_ = nullptr;
+    }
 }
 
 int SingleDeviceTsBlockReader::construct_column_context(
@@ -258,6 +269,7 @@ int SingleDeviceTsBlockReader::construct_column_context(
                 device_query_task_->get_column_mapping()->get_column_pos(
                     time_series_index->get_measurement_name().to_std_string()),
                 pa_))) {
+            delete column_context;
             return ret;
         }
         field_column_contexts_.insert(std::make_pair(
@@ -266,11 +278,15 @@ int SingleDeviceTsBlockReader::construct_column_context(
     } else {
         SingleMeasurementColumnContext* column_context =
             new SingleMeasurementColumnContext(tsfile_io_reader_);
-        column_context->init(
-            device_query_task_, time_series_index, time_filter,
-            device_query_task_->get_column_mapping()->get_column_pos(
-                time_series_index->get_measurement_name().to_std_string()),
-            pa_);
+        if (RET_FAIL(column_context->init(
+                device_query_task_, time_series_index, time_filter,
+                device_query_task_->get_column_mapping()->get_column_pos(
+                    time_series_index->get_measurement_name().to_std_string()),
+                pa_))) {
+            delete column_context;
+            return ret;
+        }
+
         field_column_contexts_.insert(std::make_pair(
             time_series_index->get_measurement_name().to_std_string(),
             column_context));
@@ -370,4 +386,5 @@ void SingleMeasurementColumnContext::fill_into(
         col_appenders[pos + 1]->append(val, len);
     }
 }
+
 }  // namespace storage
diff --git a/cpp/src/reader/block/single_device_tsblock_reader.h 
b/cpp/src/reader/block/single_device_tsblock_reader.h
index d2949cd4..a94aed31 100644
--- a/cpp/src/reader/block/single_device_tsblock_reader.h
+++ b/cpp/src/reader/block/single_device_tsblock_reader.h
@@ -112,8 +112,11 @@ class SingleMeasurementColumnContext final : public 
MeasurementColumnContext {
             delete value_iter_;
             value_iter_ = nullptr;
         }
-        ssi_->revert_tsblock();
+        if (ssi_) {
+            ssi_->revert_tsblock();
+        }
         tsfile_io_reader_->revert_ssi(ssi_);
+        ssi_ = nullptr;
     }
 
     void fill_into(std::vector<common::ColAppender*>& col_appenders) override;
@@ -128,6 +131,7 @@ class SingleMeasurementColumnContext final : public 
MeasurementColumnContext {
     int get_current_value(char*& value, uint32_t& len) override;
     int move_iter() override;
 
+
    private:
     std::string column_name_;
     std::vector<int32_t> pos_in_result_;
diff --git a/cpp/src/reader/table_query_executor.h 
b/cpp/src/reader/table_query_executor.h
index e9a3c513..32d522c0 100644
--- a/cpp/src/reader/table_query_executor.h
+++ b/cpp/src/reader/table_query_executor.h
@@ -50,7 +50,7 @@ class TableQueryExecutor {
         tsfile_io_reader_->init(read_file);
         meta_data_querier_ = new MetadataQuerier(tsfile_io_reader_);
         table_query_ordering_ = TableQueryOrdering::DEVICE;
-        block_size_ = 10240;
+        block_size_ = 1024;
     }
     ~TableQueryExecutor() {
         if (meta_data_querier_ != nullptr) {
diff --git a/cpp/src/reader/table_result_set.cc 
b/cpp/src/reader/table_result_set.cc
index a6c8f963..8eb5d9dd 100644
--- a/cpp/src/reader/table_result_set.cc
+++ b/cpp/src/reader/table_result_set.cc
@@ -39,9 +39,16 @@ int TableResultSet::next(bool& has_next) {
     while (row_iterator_ == nullptr || !row_iterator_->has_next()) {
         if (RET_FAIL(tsblock_reader_->has_next(has_next))) {
             return ret;
-        } else if (!has_next) {
+        }
+
+        if (!has_next) {
+            if (row_iterator_) {
+                delete row_iterator_;
+                row_iterator_ = nullptr;
+            }
             break;
         }
+
         if (RET_FAIL(tsblock_reader_->next(tsblock_))) {
             break;
         }
@@ -49,6 +56,7 @@ int TableResultSet::next(bool& has_next) {
             delete row_iterator_;
             row_iterator_ = nullptr;
         }
+
         row_iterator_ = new common::RowIterator(tsblock_);
     }
     if (row_iterator_ == nullptr || !row_iterator_->has_next()) {
@@ -100,10 +108,6 @@ void TableResultSet::close() {
         delete row_iterator_;
         row_iterator_ = nullptr;
     }
-    if (tsblock_) {
-        delete tsblock_;
-        tsblock_ = nullptr;
-    }
 }
 
 }  // namespace storage
\ No newline at end of file
diff --git a/cpp/test/writer/table_view/tsfile_writer_table_test.cc 
b/cpp/test/writer/table_view/tsfile_writer_table_test.cc
index a616b7fa..bc152571 100644
--- a/cpp/test/writer/table_view/tsfile_writer_table_test.cc
+++ b/cpp/test/writer/table_view/tsfile_writer_table_test.cc
@@ -182,7 +182,7 @@ TEST_F(TsFileWriterTableTest, WriteNonExistTableTest) {
     delete table_schema;
 }
 
-TEST_F(TsFileWriterTableTest, DISABLED_WriteAndReadSimple) {
+TEST_F(TsFileWriterTableTest, WriteAndReadSimple) {
     std::vector<MeasurementSchema*> measurement_schemas;
     std::vector<ColumnCategory> column_categories;
     measurement_schemas.resize(2);
@@ -210,19 +210,26 @@ TEST_F(TsFileWriterTableTest, 
DISABLED_WriteAndReadSimple) {
     TsFileReader reader = TsFileReader();
     reader.open(write_file_.get_file_path());
     ResultSet* ret = nullptr;
-    int ret_value =
-        reader.query("test_table", {"device", "value"}, 10, 50, ret);
+    int ret_value = reader.query("test_table", {"device", "value"}, 0, 50, 
ret);
     ASSERT_EQ(common::E_OK, ret_value);
 
+    ASSERT_EQ(ret_value, 0);
     auto* table_result_set = (TableResultSet*)ret;
     bool has_next = false;
-    // There may be error in AlignedChunkReader::read_from_file_and_rewrap.
-    // read_from_file_and_rewrap::read_file_->read 308 may get E_FILE_READ_ERR.
+    int cur_line = 0;
     while (IS_SUCC(table_result_set->next(has_next)) && has_next) {
-        std::cout << table_result_set->get_value<double>("value");
-        std::cout << table_result_set->get_value<common::String*>("device");
+        cur_line++;
+        int64_t timestamp = table_result_set->get_value<int64_t>("time");
+        ASSERT_EQ(table_result_set->get_value<common::String*>("device")
+                      ->to_std_string(),
+                  "device" + to_string(timestamp));
+        ASSERT_EQ(table_result_set->get_value<double>("value"),
+                  timestamp * 1.1);
     }
+    ASSERT_EQ(cur_line, 51);
+    table_result_set->close();
     reader.destroy_query_data_set(table_result_set);
+
     reader.close();
     delete table_schema;
 }
\ No newline at end of file
diff --git a/cpp/test/writer/tsfile_writer_test.cc 
b/cpp/test/writer/tsfile_writer_test.cc
index b17a126c..bd4a4793 100644
--- a/cpp/test/writer/tsfile_writer_test.cc
+++ b/cpp/test/writer/tsfile_writer_test.cc
@@ -50,7 +50,6 @@ class TsFileWriterTest : public ::testing::Test {
         ASSERT_EQ(tsfile_writer_->open(file_name_, flags, mode), common::E_OK);
     }
     void TearDown() override {
-        tsfile_writer_->close();
         delete tsfile_writer_;
         int ret = remove(file_name_.c_str());
         ASSERT_EQ(0, ret);
@@ -111,7 +110,9 @@ class TsFileWriterTest : public ::testing::Test {
     }
 };
 
-TEST_F(TsFileWriterTest, InitWithNullWriteFile) {
+class TsFileWriterTestSimple : public ::testing::Test{};
+
+TEST_F(TsFileWriterTestSimple, InitWithNullWriteFile) {
     TsFileWriter writer;
     ASSERT_EQ(writer.init(nullptr), E_INVALID_ARG);
 }
@@ -226,6 +227,9 @@ TEST_F(TsFileWriterTest, RegisterTimeSeries) {
                   storage::MeasurementSchema(measurement_name, data_type,
                                              encoding, compression_type)),
               E_OK);
+    ASSERT_EQ(tsfile_writer_->flush(), E_OK);
+    ASSERT_EQ(tsfile_writer_->close(), E_OK);
+
 }
 
 TEST_F(TsFileWriterTest, WriteMultipleRecords) {
diff --git a/python/tests/test_write_and_read.py 
b/python/tests/test_write_and_read.py
index ff86db7f..30013057 100644
--- a/python/tests/test_write_and_read.py
+++ b/python/tests/test_write_and_read.py
@@ -16,16 +16,16 @@
 # under the License.
 #
 
-import pytest
-
 import os
 
-from tsfile import TsFileWriter, TsFileReader, ColumnCategory
-from tsfile import TimeseriesSchema, DeviceSchema, ResultSetMetaData
+import pytest
+
 from tsfile import ColumnSchema, TableSchema
 from tsfile import TSDataType
 from tsfile import Tablet, RowRecord, Field
+from tsfile import TimeseriesSchema
 from tsfile import TsFileTableWriter
+from tsfile import TsFileWriter, TsFileReader, ColumnCategory
 
 
 def test_row_record_write_and_read():
@@ -103,7 +103,7 @@ def test_tablet_write_and_read():
             os.remove("tablet_write_and_read.tsfile")
 
 
-def test_table_writer():
+def test_table_writer_and_reader():
     table = TableSchema("test_table",
                         [ColumnSchema("device", TSDataType.STRING, 
ColumnCategory.TAG),
                          ColumnSchema("value", TSDataType.DOUBLE, 
ColumnCategory.FIELD)])
@@ -117,14 +117,36 @@ def test_table_writer():
                 tablet.add_value_by_index(1, i, i * 100.0)
             writer.write_table(tablet)
 
-        # with TsFileReader("table_write.tsfile") as reader:
-        #     pass
-        #     with reader.query_table("test_table", ["device", "value"],
-        #                             10, 50) as result:
-        #         while result.next():
-        #             print(result.get_value_by_name("device"))
-        #             print(result.get_value_by_name("value"))
-
+        with TsFileReader("table_write.tsfile") as reader:
+            with reader.query_table("test_table", ["device", "value"],
+                                    0, 10) as result:
+                cur_line = 0
+                while result.next():
+                    cur_time = result.get_value_by_name("time")
+                    assert result.get_value_by_name("device") == "device" + 
str(cur_time)
+                    assert result.get_value_by_name("value") == cur_time * 
100.0
+                    cur_line = cur_line + 1
+                assert cur_line == 11
+            with reader.query_table("test_table", ["device", "value"],
+                                    0, 100) as result:
+                line_num = 0
+                print("dataframe")
+                while result.next():
+                    data_frame = result.read_data_frame(max_row_num=30)
+                    if 100 - line_num >= 30:
+                        assert data_frame.shape == (30, 3)
+                    else:
+                        assert data_frame.shape == (100 - line_num, 3)
+                    line_num += len(data_frame)
+
+            schemas = reader.get_all_table_schemas()
+            assert len(schemas) == 1
+            assert schemas["test_table"] is not None
+            tableSchema = schemas["test_table"]
+            assert tableSchema.get_table_name() == "test_table"
+            print(tableSchema)
+            assert tableSchema.__repr__() == ("TableSchema(test_table, 
[ColumnSchema(device,"
+                                              " STRING, TAG), 
ColumnSchema(value, DOUBLE, FIELD)])")
     finally:
         if os.path.exists("table_write.tsfile"):
             os.remove("table_write.tsfile")
diff --git a/python/tsfile/schema.py b/python/tsfile/schema.py
index 19f7a02e..65189b70 100644
--- a/python/tsfile/schema.py
+++ b/python/tsfile/schema.py
@@ -78,6 +78,9 @@ class ColumnSchema:
         self.data_type = data_type
         self.category = category
 
+    def __repr__(self) -> str:
+        return f"ColumnSchema({self.column_name}, {self.data_type.name}, 
{self.category.name})"
+
     def get_column_name(self):
         return self.column_name
 
@@ -103,6 +106,9 @@ class TableSchema:
     def get_columns(self):
         return self.columns
 
+    def __repr__(self) -> str:
+        return f"TableSchema({self.table_name}, {self.columns})"
+
 
 class ResultSetMetaData:
     """Metadata container for query result sets (columns, types, table 
name)."""
@@ -127,8 +133,15 @@ class ResultSetMetaData:
             raise OverflowError
         return self.column_list[column_index - 1]
 
-    def get_column_name_index(self, column_name: str) -> int:
-        return self.column_list.index(self.table_name + "." + column_name) + 1
+    def get_column_name_index(self, column_name: str, is_tree: bool = False) 
-> int:
+        """
+        For Tree model, column is full path, column_name means sensor_name.
+        For Table model, column is just column name.
+        """
+        if is_tree:
+            return self.column_list.index(self.table_name + "." + column_name) 
+ 1
+        else:
+            return self.column_list.index(column_name) + 1
 
     def get_column_num(self):
         return len(self.column_list)
diff --git a/python/tsfile/tsfile_cpp.pxd b/python/tsfile/tsfile_cpp.pxd
index 9ebe8eee..b4f9ccf8 100644
--- a/python/tsfile/tsfile_cpp.pxd
+++ b/python/tsfile/tsfile_cpp.pxd
@@ -147,25 +147,33 @@ cdef extern from "./tsfile_cwrapper.h":
 
     # row_record
     TsRecord _ts_record_new(const char * device_id, int64_t timestamp, int 
timeseries_num);
-    ErrorCode _insert_data_into_ts_record_by_name_int32_t(TsRecord data, const 
char *measurement_name, const int32_t value);
-    ErrorCode _insert_data_into_ts_record_by_name_int64_t(TsRecord data, const 
char *measurement_name, const int64_t value);
+    ErrorCode _insert_data_into_ts_record_by_name_int32_t(TsRecord data, const 
char *measurement_name,
+                                                          const int32_t value);
+    ErrorCode _insert_data_into_ts_record_by_name_int64_t(TsRecord data, const 
char *measurement_name,
+                                                          const int64_t value);
     ErrorCode _insert_data_into_ts_record_by_name_float(TsRecord data, const 
char *measurement_name, const float value);
-    ErrorCode _insert_data_into_ts_record_by_name_double(TsRecord data, const 
char *measurement_name, const double value);
-    ErrorCode _insert_data_into_ts_record_by_name_bool(TsRecord data, const 
char *measurement_name,const  bint value);
-
-    void _free_tsfile_ts_record(TsRecord* record);
+    ErrorCode _insert_data_into_ts_record_by_name_double(TsRecord data, const 
char *measurement_name,
+                                                         const double value);
+    ErrorCode _insert_data_into_ts_record_by_name_bool(TsRecord data, const 
char *measurement_name, const  bint value);
 
+    void _free_tsfile_ts_record(TsRecord * record);
 
     # resulSet : query data from tsfile reader
     ResultSet tsfile_query_table(TsFileReader reader,
-                                        const char * table_name,
-                                        const char** columns, uint32_t 
column_num,
-                                        int64_t start_time, int64_t end_time, 
ErrorCode *err_code)
+                                 const char * table_name,
+                                 const char** columns, uint32_t column_num,
+                                 int64_t start_time, int64_t end_time, 
ErrorCode *err_code)
     ResultSet _tsfile_reader_query_device(TsFileReader reader,
                                           const char *device_name,
                                           char ** sensor_name, uint32_t 
sensor_num,
                                           int64_t start_time, int64_t 
end_time, ErrorCode *err_code)
 
+    TableSchema tsfile_reader_get_table_schema(TsFileReader reader,
+                                               const char * table_name);
+
+    TableSchema * tsfile_reader_get_all_table_schemas(TsFileReader reader,
+                                                      uint32_t * size);
+
     # resultSet : get data from resultSet
     bint tsfile_result_set_next(ResultSet result_set, ErrorCode * err_code);
     bint tsfile_result_set_is_null_by_index(ResultSet result_set, uint32_t 
column_index);
@@ -178,5 +186,6 @@ cdef extern from "./tsfile_cwrapper.h":
     float tsfile_result_set_get_value_by_index_float(ResultSet result_set, 
uint32_t column_index);
     double tsfile_result_set_get_value_by_index_double(ResultSet result_set, 
uint32_t column_index);
     char * tsfile_result_set_get_value_by_index_string(ResultSet result_set, 
uint32_t column_index);
+
     ResultSetMetaData tsfile_result_set_get_metadata(ResultSet result_set);
     void free_result_set_meta_data(ResultSetMetaData result_set_meta_data);
diff --git a/python/tsfile/tsfile_py_cpp.pxd b/python/tsfile/tsfile_py_cpp.pxd
index 926a56da..be1d5290 100644
--- a/python/tsfile/tsfile_py_cpp.pxd
+++ b/python/tsfile/tsfile_py_cpp.pxd
@@ -22,6 +22,8 @@ from .tsfile_cpp cimport *
 
 cdef public api inline void check_error(int errcode, const char* context=NULL) 
except *
 cdef public api object from_c_result_set_meta_data(ResultSetMetaData schema)
+cdef public api object from_c_column_schema(ColumnSchema schema)
+cdef public api object from_c_table_schema(TableSchema schema)
 cdef public api TSDataType to_c_data_type(object data_type)
 cdef public api TSEncoding to_c_encoding_type(object encoding_type)
 cdef public api CompressionType to_c_compression_type(object compression_type)
@@ -48,4 +50,6 @@ cdef public api bint 
tsfile_result_set_is_null_by_name_c(ResultSet result_set, o
 cdef public api ResultSet tsfile_reader_query_table_c(TsFileReader reader, 
object table_name, object column_list,
                                             int64_t start_time, int64_t 
end_time)
 cdef public api ResultSet tsfile_reader_query_paths_c(TsFileReader reader, 
object device_name, object sensor_list, int64_t start_time,
-                                                      int64_t end_time)
\ No newline at end of file
+                                                      int64_t end_time)
+cdef public api object get_table_schema(TsFileReader reader, object table_name)
+cdef public api object get_all_table_schema(TsFileReader reader)
\ No newline at end of file
diff --git a/python/tsfile/tsfile_py_cpp.pyx b/python/tsfile/tsfile_py_cpp.pyx
index 206850c1..649c0755 100644
--- a/python/tsfile/tsfile_py_cpp.pyx
+++ b/python/tsfile/tsfile_py_cpp.pyx
@@ -31,6 +31,7 @@ from tsfile.exceptions import ERROR_MAPPING
 from tsfile.schema import ResultSetMetaData as ResultSetMetaDataPy
 from tsfile.schema import TSDataType as TSDataTypePy, TSEncoding as 
TSEncodingPy
 from tsfile.schema import Compressor as CompressorPy, ColumnCategory as 
CategoryPy
+from tsfile.schema import TableSchema as TableSchemaPy, ColumnSchema as 
ColumnSchemaPy
 
 # check exception and set py exception object
 cdef inline void check_error(int errcode, const char* context=NULL) except *:
@@ -57,6 +58,23 @@ cdef object from_c_result_set_meta_data(ResultSetMetaData 
schema):
     result = ResultSetMetaDataPy(column_list, data_types)
     return result
 
+cdef object from_c_column_schema(ColumnSchema schema):
+    column_name = schema.column_name.decode('utf-8')
+    data_type = TSDataTypePy(schema.data_type)
+    category_type = CategoryPy(schema.column_category)
+    return ColumnSchemaPy(column_name, data_type, category_type)
+
+cdef object from_c_table_schema(TableSchema schema):
+    cdef int i
+    table_name = schema.table_name.decode('utf-8')
+    columns = []
+    for i in range(schema.column_num):
+        columns.append(from_c_column_schema(schema.column_schemas[i]))
+    free_c_table_schema(&schema)
+    return TableSchemaPy(table_name, columns)
+
+
+
 # Convert from python to c struct
 cdef dict TS_DATA_TYPE_MAP = {
     TSDataTypePy.BOOLEAN: TSDataType.TS_DATATYPE_BOOLEAN,
@@ -141,7 +159,6 @@ cdef TimeseriesSchema* to_c_timeseries_schema(object 
py_schema):
     return c_schema
 
 
-
 cdef DeviceSchema* to_c_device_schema(object py_schema):
     cdef DeviceSchema* c_schema
     c_schema = <DeviceSchema *> malloc(sizeof(DeviceSchema))
@@ -409,3 +426,22 @@ cdef ResultSet tsfile_reader_query_paths_c(TsFileReader 
reader, object device_na
             free(<void*>sensor_list_c)
             sensor_list_c = NULL
 
+cdef object get_table_schema(TsFileReader reader, object table_name):
+    cdef bytes table_name_bytes = PyUnicode_AsUTF8String(table_name)
+    cdef const char* table_name_c = table_name_bytes
+    cdef TableSchema schema = tsfile_reader_get_table_schema(reader, 
table_name_c)
+    return from_c_table_schema(schema)
+
+cdef object get_all_table_schema(TsFileReader reader):
+    cdef uint32_t table_num = 0
+    cdef TableSchema* schemas
+    cdef int i
+
+    table_schemas = {}
+    schemas = tsfile_reader_get_all_table_schemas(reader, &table_num)
+    for i in range(table_num):
+        schema_py = from_c_table_schema(schemas[i])
+        table_schemas.update([(schema_py.get_table_name(), schema_py)])
+    free(schemas)
+    return table_schemas
+
diff --git a/python/tsfile/tsfile_reader.pyx b/python/tsfile/tsfile_reader.pyx
index 8a1661d4..e168c400 100644
--- a/python/tsfile/tsfile_reader.pyx
+++ b/python/tsfile/tsfile_reader.pyx
@@ -49,11 +49,13 @@ cdef class ResultSetPy:
     cdef object valid
     # The reader
     cdef object tsfile_reader
+    cdef object is_tree
 
-    def __init__(self, tsfile_reader : TsFileReaderPy):
+    def __init__(self, tsfile_reader : TsFileReaderPy, is_tree: bint = False):
         self.metadata = None
         self.valid = True
         self.tsfile_reader = weakref.ref(tsfile_reader)
+        self.is_tree = is_tree
 
     cdef init_c(self, ResultSet result, object device_name):
         """
@@ -90,7 +92,7 @@ cdef class ResultSetPy:
             )
         }
 
-    def read_next_data_frame(self, max_row_num : int = 1024):
+    def read_data_frame(self, max_row_num : int = 1024):
         """
         :param max_row_num: default row num: 1024
         :return: a dataframe contains data from query result.
@@ -102,23 +104,40 @@ cdef class ResultSetPy:
         date_columns = [
             column_names[i]
             for i in range(column_num)
-            if self.metadata.get_data_type(i) == TSDataTypePy.DATE
+            if self.metadata.get_data_type(i + 1) == TSDataTypePy.DATE
         ]
 
-        data_type = [self.metadata.get_data_type(i).to_pandas_dtype() for i in 
range(column_num)]
+        data_type = [self.metadata.get_data_type(i + 1).to_pandas_dtype() for 
i in range(column_num)]
 
         data_container = {
             column_name: [] for column_name in column_names
         }
 
         cur_line = 0
-        while self.next() and cur_line < max_row_num:
-            row_data = (
-                self.get_value_by_index(i)
-                for i in range(column_num)
-            )
+
+        # User may call result_set.next() before or not, so we just get 
current data.
+        # if there is no data in result set, we just get a None list.
+        row_data = [
+            self.get_value_by_index(i + 1)
+            for i in range(column_num)
+        ]
+
+        if not all(value is None for value in row_data):
             for column_name, value in zip(column_names, row_data):
                 data_container[column_name].append(value)
+            cur_line += 1
+
+        while cur_line < max_row_num:
+            if self.next():
+                row_data = (
+                    self.get_value_by_index(i + 1)
+                    for i in range(column_num)
+                )
+                for column_name, value in zip(column_names, row_data):
+                    data_container[column_name].append(value)
+                cur_line += 1
+            else:
+                break
 
         df = pd.DataFrame(data_container)
         data_type_dict = {col: dtype for col, dtype in zip(column_names, 
data_type)}
@@ -171,10 +190,9 @@ cdef class ResultSetPy:
         """
         self.check_result_set_invalid()
         if tsfile_result_set_is_null_by_name_c(self.result, column_name):
-            print("Get None")
             return None
         # get index in metadata, metadata ind from 0.
-        ind = self.metadata.get_column_name_index(column_name)
+        ind = self.metadata.get_column_name_index(column_name, self.is_tree)
         return self.get_value_by_index(ind)
 
     def get_metadata(self):
@@ -200,7 +218,7 @@ cdef class ResultSetPy:
         Checks whether the field with the specified column name in the result 
set is null.
         """
         self.check_result_set_invalid()
-        ind = self.metadata.get_column_name_index(name)
+        ind = self.metadata.get_column_name_index(name, self.is_tree)
         return self.is_null_by_index(ind + 1)
 
     def check_result_set_invalid(self):
@@ -282,7 +300,7 @@ cdef class TsFileReaderPy:
         """
         cdef ResultSet result;
         result = tsfile_reader_query_paths_c(self.reader, device_name, 
sensor_list, start_time, end_time)
-        pyresult = ResultSetPy(self)
+        pyresult = ResultSetPy(self, True)
         pyresult.init_c(result, device_name)
         self.activate_result_set_list.add(pyresult)
         return pyresult
@@ -295,6 +313,19 @@ cdef class TsFileReaderPy:
         """
         self.activate_result_set_list.discard(result_set)
 
+    def get_table_schema(self, table_name : str):
+        """
+        Get table's schema with specify table name.
+        """
+        return get_table_schema(self.reader, table_name)
+
+    def get_all_table_schemas(self):
+        """
+        Get all tables schemas
+        """
+        return get_all_table_schema(self.reader)
+
+
     def close(self):
         """
         Close TsFile Reader, if reader has result sets, invalid them.

Reply via email to