This is an automated email from the ASF dual-hosted git repository. colinlee pushed a commit to branch support_arrow_struct in repository https://gitbox.apache.org/repos/asf/tsfile.git
commit a4fc6be0689a557fe8c686c94b06096e860990af Author: ColinLee <[email protected]> AuthorDate: Sun Mar 8 23:16:19 2026 +0800 fix benchmark. --- cpp/src/reader/table_query_executor.cc | 5 +- .../table_view/tsfile_reader_table_batch_test.cc | 4 - python/Untitled | 1 - python/lower_case_name.tsfile | Bin 23089 -> 23089 bytes python/requirements.txt | 1 - python/test1.tsfile | Bin 23089 -> 23089 bytes python/tests/bench_batch_arrow_vs_dataframe.py | 87 +++++---------------- 7 files changed, 23 insertions(+), 75 deletions(-) diff --git a/cpp/src/reader/table_query_executor.cc b/cpp/src/reader/table_query_executor.cc index 2a01a6d5c..8f71d7625 100644 --- a/cpp/src/reader/table_query_executor.cc +++ b/cpp/src/reader/table_query_executor.cc @@ -83,8 +83,9 @@ int TableQueryExecutor::query(const std::string& table_name, ret = common::E_UNSUPPORTED_ORDER; } assert(tsblock_reader != nullptr); - ret_qds = new TableResultSet(std::move(tsblock_reader), - lower_case_column_names, data_types); + ret_qds = + new TableResultSet(std::move(tsblock_reader), lower_case_column_names, + data_types, batch_mode_); return ret; } diff --git a/cpp/test/reader/table_view/tsfile_reader_table_batch_test.cc b/cpp/test/reader/table_view/tsfile_reader_table_batch_test.cc index ece8ea7bf..1298157e2 100644 --- a/cpp/test/reader/table_view/tsfile_reader_table_batch_test.cc +++ b/cpp/test/reader/table_view/tsfile_reader_table_batch_test.cc @@ -208,7 +208,6 @@ TEST_F(TsFileTableReaderBatchTest, BatchQueryWithSmallBatchSize) { std::strcpy(literal, "device_id"); String expected_string(literal, std::strlen("device_id")); std::vector<int64_t> int64_sums(3, 0); - std::cout << "begin to start" << std::endl; while ((ret = table_result_set->get_next_tsblock(block)) == common::E_OK) { ASSERT_NE(block, nullptr); block_count++; @@ -231,7 +230,6 @@ TEST_F(TsFileTableReaderBatchTest, BatchQueryWithSmallBatchSize) { int64_t int_val = *reinterpret_cast<const int64_t*>(value); int64_sums[int64_col_idx] += int_val; int64_col_idx++; - std::cout << "to add" << int_val << std::endl; } else if (data_type == TSDataType::STRING) { String str_value(value, len); ASSERT_EQ(str_value.compare(expected_string), 0); @@ -240,8 +238,6 @@ TEST_F(TsFileTableReaderBatchTest, BatchQueryWithSmallBatchSize) { row_iterator.next(); } } - std::cout << "finish with ret" << ret << std::endl; - std::cout << "check finished" << std::endl; EXPECT_EQ(total_rows, device_num * points_per_device); EXPECT_GT(block_count, 1); for (size_t i = 0; i < int64_sums.size(); i++) { diff --git a/python/Untitled b/python/Untitled deleted file mode 100644 index 4197b94e5..000000000 --- a/python/Untitled +++ /dev/null @@ -1 +0,0 @@ -**/node_modules/** \ No newline at end of file diff --git a/python/lower_case_name.tsfile b/python/lower_case_name.tsfile index 732a9f94e..d4717671d 100644 Binary files a/python/lower_case_name.tsfile and b/python/lower_case_name.tsfile differ diff --git a/python/requirements.txt b/python/requirements.txt index fcb05c3ae..56cb04036 100644 --- a/python/requirements.txt +++ b/python/requirements.txt @@ -21,7 +21,6 @@ cython==3.0.10 numpy==1.26.4 pandas==2.2.2 setuptools==78.1.1 -<<<<<<< HEAD wheel==0.46.2 pyarrow diff --git a/python/test1.tsfile b/python/test1.tsfile index 85264f12c..1141f08d9 100644 Binary files a/python/test1.tsfile and b/python/test1.tsfile differ diff --git a/python/tests/bench_batch_arrow_vs_dataframe.py b/python/tests/bench_batch_arrow_vs_dataframe.py index 082e0c7a8..e1f6c421a 100644 --- a/python/tests/bench_batch_arrow_vs_dataframe.py +++ b/python/tests/bench_batch_arrow_vs_dataframe.py @@ -33,6 +33,7 @@ import time from os import remove import pandas as pd +import pyarrow as pa import pytest from tsfile import ( @@ -40,7 +41,6 @@ from tsfile import ( ColumnCategory, TSDataType, TableSchema, - Tablet, TsFileReader, TsFileTableWriter, ) @@ -48,7 +48,6 @@ from tsfile import ( # Default benchmark size DEFAULT_ROW_COUNT = 50_000 DEFAULT_BATCH_SIZE = 4096 -DEFAULT_WARMUP_ROUNDS = 1 DEFAULT_TIMED_ROUNDS = 3 BENCH_FILE = "bench_arrow_vs_dataframe.tsfile" @@ -64,7 +63,7 @@ def _ensure_bench_tsfile(file_path: str, row_count: int) -> None: import numpy as np df = pd.DataFrame({ "time": np.arange(row_count, dtype=np.int64), - "device": pd.Series([f"device_{i}" for i in range(row_count)]), + "device": pd.Series([f"device" for i in range(row_count)]), "value1": np.arange(0, row_count * 10, 10, dtype=np.int64), "value2": np.arange(row_count, dtype=np.float64) * 1.5, }) @@ -77,34 +76,12 @@ def _ensure_bench_tsfile(file_path: str, row_count: int) -> None: ColumnSchema("value2", TSDataType.DOUBLE, ColumnCategory.FIELD), ], ) - chunk = min(row_count, 10_000) - tablet = Tablet( - COLUMNS, - [TSDataType.STRING, TSDataType.INT64, TSDataType.DOUBLE], - chunk, - ) with TsFileTableWriter(file_path, table) as writer: - for start in range(0, row_count, chunk): - end = min(start + chunk, row_count) - n = end - start - chunk_df = df.iloc[start:end] - # Bulk-fill tablet from DataFrame (column order: device, value1, value2) - tablet.timestamp_list[:n] = chunk_df["time"].tolist() - tablet.data_list[0][:n] = chunk_df["device"].tolist() - tablet.data_list[1][:n] = chunk_df["value1"].tolist() - tablet.data_list[2][:n] = chunk_df["value2"].tolist() - # Unused rows must be None so C side skips them - for i in range(n, chunk): - tablet.timestamp_list[i] = None - for col_idx in range(len(COLUMNS)): - tablet.data_list[col_idx][i] = None - writer.write_table(tablet) + writer.write_dataframe(df) def _read_via_arrow(file_path: str, batch_size: int, end_time: int) -> int: """Read all rows using query_table_batch + read_arrow_batch. Returns total rows.""" - import pyarrow as pa # noqa: F401 - reader = TsFileReader(file_path) result_set = reader.query_table_batch( table_name=TABLE_NAME, @@ -165,32 +142,14 @@ def _run_timed(name: str, func, *args, rounds: int = DEFAULT_TIMED_ROUNDS): def run_benchmark( row_count: int = DEFAULT_ROW_COUNT, batch_size: int = DEFAULT_BATCH_SIZE, - warmup_rounds: int = DEFAULT_WARMUP_ROUNDS, timed_rounds: int = DEFAULT_TIMED_ROUNDS, file_path: str = BENCH_FILE, ): - try: - import pyarrow as pa # noqa: F401 - pa_available = True - except ImportError: - pa_available = False - _ensure_bench_tsfile(file_path, row_count) end_time = row_count + 1 - print(f"Benchmark: {row_count} rows, batch_size={batch_size}") - print(f" warmup_rounds={warmup_rounds}, timed_rounds={timed_rounds}") - if not pa_available: - print(" pyarrow not installed; Arrow path skipped.") - - # Warmup - for _ in range(warmup_rounds): - _read_via_dataframe(file_path, batch_size, end_time) - if pa_available: - for _ in range(warmup_rounds): - _read_via_arrow(file_path, batch_size, end_time) + print(f"Benchmark: {row_count} rows, batch_size={batch_size}, timed_rounds={timed_rounds}") - # Timed runs df_avg, df_rows = _run_timed( "query_table + read_data_frame", _read_via_dataframe, @@ -200,26 +159,23 @@ def run_benchmark( rounds=timed_rounds, ) - if pa_available: - arrow_avg, arrow_rows = _run_timed( - "query_table_batch + read_arrow_batch", - _read_via_arrow, - file_path, - batch_size, - end_time, - rounds=timed_rounds, - ) - print() - if df_avg > 0: - speedup = arrow_avg / df_avg - print(f" Arrow vs DataFrame time ratio: {speedup:.2f}x ({'Arrow faster' if speedup < 1 else 'DataFrame faster'})") - assert df_rows == row_count, f"DataFrame path row count {df_rows} != {row_count}" - assert arrow_rows == row_count, f"Arrow path row count {arrow_rows} != {row_count}" - else: - assert df_rows == row_count, f"DataFrame path row count {df_rows} != {row_count}" + arrow_avg, arrow_rows = _run_timed( + "query_table_batch + read_arrow_batch", + _read_via_arrow, + file_path, + batch_size, + end_time, + rounds=timed_rounds, + ) + print() + if df_avg > 0: + speedup = arrow_avg / df_avg + print(f" Arrow vs DataFrame time ratio: {speedup:.2f}x ({'Arrow faster' if speedup < 1 else 'DataFrame faster'})") + assert df_rows == row_count, f"DataFrame path row count {df_rows} != {row_count}" + assert arrow_rows == row_count, f"Arrow path row count {arrow_rows} != {row_count}" print() - return (df_avg, arrow_avg) if pa_available else (df_avg, None) + return df_avg, arrow_avg def test_bench_arrow_vs_dataframe_default(): @@ -227,7 +183,6 @@ def test_bench_arrow_vs_dataframe_default(): run_benchmark( row_count=5_000, batch_size=1024, - warmup_rounds=0, timed_rounds=2, ) @@ -237,16 +192,14 @@ def test_bench_arrow_vs_dataframe_medium(): run_benchmark( row_count=DEFAULT_ROW_COUNT, batch_size=DEFAULT_BATCH_SIZE, - warmup_rounds=DEFAULT_WARMUP_ROUNDS, timed_rounds=DEFAULT_TIMED_ROUNDS, ) def test_bench_arrow_vs_dataframe_large(): run_benchmark( - row_count=200_000, + row_count=2000_000, batch_size=8192, - warmup_rounds=1, timed_rounds=3, )
