westonpace commented on code in PR #13426: URL: https://github.com/apache/arrow/pull/13426#discussion_r920715044
########## cpp/src/arrow/compute/exec/asof_join_benchmark.cc: ########## @@ -0,0 +1,456 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <boost/process.hpp> +#include <string> + +#include "benchmark/benchmark.h" + +#include "arrow/compute/exec/test_util.h" +#include "arrow/csv/writer.h" +#include "arrow/dataset/file_parquet.h" +#include "arrow/filesystem/api.h" +#include "arrow/ipc/api.h" +#include "arrow/table.h" +#include "arrow/testing/future_util.h" + +namespace arrow { +namespace compute { + +static const char* time_col = "time"; +static const char* key_col = "id"; +static bool createdBenchmarkFiles = false; +static std::shared_ptr<arrow::internal::TemporaryDir> temp_dir = +arrow::internal::TemporaryDir::Make("asof_benchmarks").ValueOrDie(); + +struct ReaderNodeTableProperties { + ExecNode* execNode; + size_t total_rows; + size_t total_bytes; +}; + +// requires export PYTHONPATH=/path/to/benchmark_scripts/table_generation +// calls generate_benchmark_files to create tables (feather files) varying in frequency, +// width, key density for benchmarks. places generated files in benchmark/data. This +// operation runs once at the beginning of benchmarking. +static void DoSetup() { + if (!createdBenchmarkFiles) { + std::error_code err; + std::ostringstream call_stream; + call_stream << "python3 -m generate_benchmark_files " << temp_dir.get()->path().ToString(); + boost::process::system(call_stream.str(), err); + if (err) { + std::cerr << "Could not generate python files." << std::endl; + std::cerr << "Error Message: " << err.message() << std::endl; + } + createdBenchmarkFiles = true; + } +} + +static std::vector<std::string> generateRightHandTables(std::string freq, int width_index, + int num_tables, + int num_ids_index) { + auto const generate_file_name = [](std::string freq, std::string is_wide, + std::string num_ids, std::string num) { + return freq + "_" + is_wide + "_" + num_ids + num + ".feather"; + }; + + std::string width_table[] = {"20_cols", "100_cols", "500_cols"}; + std::string num_ids_table[] = {"100_ids", "5000_ids", "10000_ids"}; + + std::vector<std::string> right_hand_tables; + right_hand_tables.reserve(num_tables); + + for (int j = 1; j <= num_tables; j++) { + right_hand_tables.push_back(generate_file_name( + freq, width_table[width_index], num_ids_table[num_ids_index], std::to_string(j))); + } + return right_hand_tables; +} + +// Wrapper to enable the use of RecordBatchFileReaders as RecordBatchReaders +class RecordBatchFileReaderWrapper : public arrow::ipc::RecordBatchReader { + std::shared_ptr<arrow::ipc::RecordBatchFileReader> _reader; + int _next; + public: + virtual ~RecordBatchFileReaderWrapper() {} + explicit RecordBatchFileReaderWrapper( + std::shared_ptr<arrow::ipc::RecordBatchFileReader> reader) + : _reader(reader), _next(0) {} + virtual arrow::Status ReadNext(std::shared_ptr<arrow::RecordBatch>* batch) { + // cout << "ReadNext _next=" << _next << "\n"; + if (_next < _reader->num_record_batches()) { + ARROW_ASSIGN_OR_RAISE(*batch, _reader->ReadRecordBatch(_next++)); + // cout << "\t --> " << (*batch)->num_rows() << "\n"; + } else { + batch->reset(); + // cout << "\t --> EOF\n"; + } + return arrow::Status::OK(); + } + virtual std::shared_ptr<arrow::Schema> schema() const { return _reader->schema(); } +}; + +static ReaderNodeTableProperties +make_arrow_ipc_reader_node(std::shared_ptr<arrow::compute::ExecPlan>& plan, Review Comment: ```suggestion MakeArrowIpcReaderNode(std::shared_ptr<arrow::compute::ExecPlan>& plan, ``` ########## cpp/src/arrow/compute/exec/asof_join_benchmark.cc: ########## @@ -0,0 +1,456 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <boost/process.hpp> +#include <string> + +#include "benchmark/benchmark.h" + +#include "arrow/compute/exec/test_util.h" +#include "arrow/csv/writer.h" +#include "arrow/dataset/file_parquet.h" +#include "arrow/filesystem/api.h" +#include "arrow/ipc/api.h" +#include "arrow/table.h" +#include "arrow/testing/future_util.h" + +namespace arrow { +namespace compute { + +static const char* time_col = "time"; +static const char* key_col = "id"; +static bool createdBenchmarkFiles = false; +static std::shared_ptr<arrow::internal::TemporaryDir> temp_dir = +arrow::internal::TemporaryDir::Make("asof_benchmarks").ValueOrDie(); + +struct ReaderNodeTableProperties { + ExecNode* execNode; + size_t total_rows; + size_t total_bytes; +}; + +// requires export PYTHONPATH=/path/to/benchmark_scripts/table_generation +// calls generate_benchmark_files to create tables (feather files) varying in frequency, +// width, key density for benchmarks. places generated files in benchmark/data. This +// operation runs once at the beginning of benchmarking. +static void DoSetup() { + if (!createdBenchmarkFiles) { + std::error_code err; + std::ostringstream call_stream; + call_stream << "python3 -m generate_benchmark_files " << temp_dir.get()->path().ToString(); + boost::process::system(call_stream.str(), err); + if (err) { + std::cerr << "Could not generate python files." << std::endl; + std::cerr << "Error Message: " << err.message() << std::endl; + } + createdBenchmarkFiles = true; + } +} + +static std::vector<std::string> generateRightHandTables(std::string freq, int width_index, + int num_tables, + int num_ids_index) { + auto const generate_file_name = [](std::string freq, std::string is_wide, + std::string num_ids, std::string num) { + return freq + "_" + is_wide + "_" + num_ids + num + ".feather"; + }; + + std::string width_table[] = {"20_cols", "100_cols", "500_cols"}; + std::string num_ids_table[] = {"100_ids", "5000_ids", "10000_ids"}; + + std::vector<std::string> right_hand_tables; + right_hand_tables.reserve(num_tables); + + for (int j = 1; j <= num_tables; j++) { + right_hand_tables.push_back(generate_file_name( + freq, width_table[width_index], num_ids_table[num_ids_index], std::to_string(j))); + } + return right_hand_tables; +} + +// Wrapper to enable the use of RecordBatchFileReaders as RecordBatchReaders +class RecordBatchFileReaderWrapper : public arrow::ipc::RecordBatchReader { + std::shared_ptr<arrow::ipc::RecordBatchFileReader> _reader; + int _next; + public: + virtual ~RecordBatchFileReaderWrapper() {} + explicit RecordBatchFileReaderWrapper( + std::shared_ptr<arrow::ipc::RecordBatchFileReader> reader) + : _reader(reader), _next(0) {} + virtual arrow::Status ReadNext(std::shared_ptr<arrow::RecordBatch>* batch) { + // cout << "ReadNext _next=" << _next << "\n"; + if (_next < _reader->num_record_batches()) { + ARROW_ASSIGN_OR_RAISE(*batch, _reader->ReadRecordBatch(_next++)); + // cout << "\t --> " << (*batch)->num_rows() << "\n"; + } else { + batch->reset(); + // cout << "\t --> EOF\n"; + } + return arrow::Status::OK(); + } + virtual std::shared_ptr<arrow::Schema> schema() const { return _reader->schema(); } +}; + +static ReaderNodeTableProperties +make_arrow_ipc_reader_node(std::shared_ptr<arrow::compute::ExecPlan>& plan, + std::shared_ptr<arrow::fs::FileSystem>& fs, + const std::string& filename, + int64_t batch_size) { + // TODO: error checking + std::shared_ptr<arrow::io::RandomAccessFile> input = *fs->OpenInputFile(filename); + std::shared_ptr<arrow::ipc::RecordBatchFileReader> in_reader = + *arrow::ipc::RecordBatchFileReader::Open(input); + std::shared_ptr<RecordBatchFileReaderWrapper> reader( + new RecordBatchFileReaderWrapper(in_reader)); + auto schema = reader->schema(); + // we assume there is a time field represented in uint64, a key field of int32, and the + // remaining fields are float64. + size_t row_size = + sizeof(_Float64) * (schema->num_fields() - 2) + sizeof(int64_t) + sizeof(int32_t); Review Comment: ```suggestion sizeof(double) * (schema->num_fields() - 2) + sizeof(int64_t) + sizeof(int32_t); ``` ########## cpp/src/arrow/compute/exec/asof_join_benchmark.cc: ########## @@ -0,0 +1,456 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <boost/process.hpp> +#include <string> + +#include "benchmark/benchmark.h" + +#include "arrow/compute/exec/test_util.h" +#include "arrow/csv/writer.h" +#include "arrow/dataset/file_parquet.h" +#include "arrow/filesystem/api.h" +#include "arrow/ipc/api.h" +#include "arrow/table.h" +#include "arrow/testing/future_util.h" + +namespace arrow { +namespace compute { + +static const char* time_col = "time"; +static const char* key_col = "id"; +static bool createdBenchmarkFiles = false; +static std::shared_ptr<arrow::internal::TemporaryDir> temp_dir = +arrow::internal::TemporaryDir::Make("asof_benchmarks").ValueOrDie(); + +struct ReaderNodeTableProperties { + ExecNode* execNode; + size_t total_rows; + size_t total_bytes; +}; + +// requires export PYTHONPATH=/path/to/benchmark_scripts/table_generation +// calls generate_benchmark_files to create tables (feather files) varying in frequency, +// width, key density for benchmarks. places generated files in benchmark/data. This +// operation runs once at the beginning of benchmarking. +static void DoSetup() { + if (!createdBenchmarkFiles) { + std::error_code err; + std::ostringstream call_stream; + call_stream << "python3 -m generate_benchmark_files " << temp_dir.get()->path().ToString(); + boost::process::system(call_stream.str(), err); + if (err) { + std::cerr << "Could not generate python files." << std::endl; + std::cerr << "Error Message: " << err.message() << std::endl; + } + createdBenchmarkFiles = true; + } +} + +static std::vector<std::string> generateRightHandTables(std::string freq, int width_index, + int num_tables, + int num_ids_index) { + auto const generate_file_name = [](std::string freq, std::string is_wide, + std::string num_ids, std::string num) { + return freq + "_" + is_wide + "_" + num_ids + num + ".feather"; + }; + + std::string width_table[] = {"20_cols", "100_cols", "500_cols"}; + std::string num_ids_table[] = {"100_ids", "5000_ids", "10000_ids"}; + + std::vector<std::string> right_hand_tables; + right_hand_tables.reserve(num_tables); + + for (int j = 1; j <= num_tables; j++) { + right_hand_tables.push_back(generate_file_name( + freq, width_table[width_index], num_ids_table[num_ids_index], std::to_string(j))); + } + return right_hand_tables; +} + +// Wrapper to enable the use of RecordBatchFileReaders as RecordBatchReaders +class RecordBatchFileReaderWrapper : public arrow::ipc::RecordBatchReader { + std::shared_ptr<arrow::ipc::RecordBatchFileReader> _reader; + int _next; + public: + virtual ~RecordBatchFileReaderWrapper() {} + explicit RecordBatchFileReaderWrapper( + std::shared_ptr<arrow::ipc::RecordBatchFileReader> reader) + : _reader(reader), _next(0) {} + virtual arrow::Status ReadNext(std::shared_ptr<arrow::RecordBatch>* batch) { + // cout << "ReadNext _next=" << _next << "\n"; + if (_next < _reader->num_record_batches()) { + ARROW_ASSIGN_OR_RAISE(*batch, _reader->ReadRecordBatch(_next++)); + // cout << "\t --> " << (*batch)->num_rows() << "\n"; + } else { + batch->reset(); + // cout << "\t --> EOF\n"; + } + return arrow::Status::OK(); + } + virtual std::shared_ptr<arrow::Schema> schema() const { return _reader->schema(); } +}; + +static ReaderNodeTableProperties +make_arrow_ipc_reader_node(std::shared_ptr<arrow::compute::ExecPlan>& plan, + std::shared_ptr<arrow::fs::FileSystem>& fs, + const std::string& filename, + int64_t batch_size) { + // TODO: error checking + std::shared_ptr<arrow::io::RandomAccessFile> input = *fs->OpenInputFile(filename); + std::shared_ptr<arrow::ipc::RecordBatchFileReader> in_reader = + *arrow::ipc::RecordBatchFileReader::Open(input); + std::shared_ptr<RecordBatchFileReaderWrapper> reader( + new RecordBatchFileReaderWrapper(in_reader)); + auto schema = reader->schema(); + // we assume there is a time field represented in uint64, a key field of int32, and the + // remaining fields are float64. + size_t row_size = + sizeof(_Float64) * (schema->num_fields() - 2) + sizeof(int64_t) + sizeof(int32_t); + auto batch_gen = *arrow::compute::MakeReaderGenerator( + std::move(reader), plan->exec_context()->executor()); + int64_t rows = in_reader->CountRows().ValueOrDie(); + // cout << "create source("<<filename<<")\n"; + return {*arrow::compute::MakeExecNode( + "source", // registered type + plan.get(), // execution plan + {}, // inputs + arrow::compute::SourceNodeOptions( + std::make_shared<arrow::Schema>(*schema), // options, ) + batch_gen)), + rows, row_size * rows}; +} + +static ReaderNodeTableProperties make_table_source_node( Review Comment: ```suggestion static ReaderNodeTableProperties MakeTableSourceNode( ``` ########## cpp/src/arrow/compute/exec/asof_join_benchmark.cc: ########## @@ -0,0 +1,568 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <boost/process.hpp> +#include <string> + +#include "benchmark/benchmark.h" + +#include "arrow/compute/exec/test_util.h" +#include "arrow/csv/writer.h" +#include "arrow/dataset/file_parquet.h" +#include "arrow/filesystem/api.h" +#include "arrow/ipc/api.h" +#include "arrow/table.h" +#include "arrow/testing/future_util.h" + +namespace arrow { +namespace compute { + +static const char* time_col = "time"; +static const char* key_col = "id"; +static bool createdBenchmarkFiles = false; +// static std::shared_ptr<arrow::internal::TemporaryDir> temp_dir = +// arrow::internal::TemporaryDir::Make("./benchmark_data/").ValueOrDie(); + +struct ReaderNodeTableProperties { + ExecNode* execNode; + size_t total_rows; + size_t total_bytes; +}; + +// requires export PYTHONPATH=/path/to/benchmark_scripts/table_generation +// calls generate_benchmark_files to create tables (feather files) varying in frequency, +// width, key density for benchmarks. places generated files in benchmark/data. This +// operation runs once at the beginning of benchmarking. +static void DoSetup() { + if (!createdBenchmarkFiles) { + boost::process::system("mkdir benchmark_data/"); + std::error_code err; + boost::process::system("python3 -m generate_benchmark_files ./benchmark_data/", err); + if (err) { Review Comment: An abort (or `FAIL` call, which is just an abort + print when outside of gtest) would be good for exiting. I think something like `FAIL() << "Could not generate python files: " << err.message()` would be fine. Not sure about the path to `benchmark_scripts/table_generation`. People running tests will generally be in some kind of copy of the GH repo. So you could just start at CWD and walk up the file tree looking for `cpp/src/benchmark_scripts/table_generation`. Then allow that to be overridden by an environment variable. I'm guessing that would suffice in most cases. ########## cpp/src/arrow/compute/exec/asof_join_benchmark.cc: ########## @@ -0,0 +1,456 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <boost/process.hpp> +#include <string> + +#include "benchmark/benchmark.h" + +#include "arrow/compute/exec/test_util.h" +#include "arrow/csv/writer.h" +#include "arrow/dataset/file_parquet.h" +#include "arrow/filesystem/api.h" +#include "arrow/ipc/api.h" +#include "arrow/table.h" +#include "arrow/testing/future_util.h" + +namespace arrow { +namespace compute { + +static const char* time_col = "time"; +static const char* key_col = "id"; +static bool createdBenchmarkFiles = false; +static std::shared_ptr<arrow::internal::TemporaryDir> temp_dir = +arrow::internal::TemporaryDir::Make("asof_benchmarks").ValueOrDie(); + +struct ReaderNodeTableProperties { + ExecNode* execNode; + size_t total_rows; + size_t total_bytes; +}; + +// requires export PYTHONPATH=/path/to/benchmark_scripts/table_generation +// calls generate_benchmark_files to create tables (feather files) varying in frequency, +// width, key density for benchmarks. places generated files in benchmark/data. This +// operation runs once at the beginning of benchmarking. +static void DoSetup() { + if (!createdBenchmarkFiles) { + std::error_code err; + std::ostringstream call_stream; + call_stream << "python3 -m generate_benchmark_files " << temp_dir.get()->path().ToString(); + boost::process::system(call_stream.str(), err); + if (err) { + std::cerr << "Could not generate python files." << std::endl; + std::cerr << "Error Message: " << err.message() << std::endl; + } + createdBenchmarkFiles = true; + } +} + +static std::vector<std::string> generateRightHandTables(std::string freq, int width_index, + int num_tables, + int num_ids_index) { + auto const generate_file_name = [](std::string freq, std::string is_wide, + std::string num_ids, std::string num) { + return freq + "_" + is_wide + "_" + num_ids + num + ".feather"; + }; + + std::string width_table[] = {"20_cols", "100_cols", "500_cols"}; + std::string num_ids_table[] = {"100_ids", "5000_ids", "10000_ids"}; + + std::vector<std::string> right_hand_tables; + right_hand_tables.reserve(num_tables); + + for (int j = 1; j <= num_tables; j++) { + right_hand_tables.push_back(generate_file_name( + freq, width_table[width_index], num_ids_table[num_ids_index], std::to_string(j))); + } + return right_hand_tables; +} + +// Wrapper to enable the use of RecordBatchFileReaders as RecordBatchReaders +class RecordBatchFileReaderWrapper : public arrow::ipc::RecordBatchReader { + std::shared_ptr<arrow::ipc::RecordBatchFileReader> _reader; + int _next; Review Comment: ```suggestion std::shared_ptr<arrow::ipc::RecordBatchFileReader> reader_; int next_; ``` Perhaps `next_row_group_` or `next_batch_index_`? But `next_` is ok too. ########## cpp/src/arrow/compute/exec/benchmark_scripts/table_generation/batch_process.py: ########## @@ -0,0 +1,104 @@ +import json +from datagen import * + +from pyarrow import ipc +from pyarrow.ipc import RecordBatchStreamWriter +import pyarrow as pa +import pyarrow.parquet as pq + + +class BatchStreamStats: + def __init__(self, filename): + self.filename = filename + self.num_batches = 0 + self.num_rows = 0 + self.num_bytes = 0 + + def close(self): + if filename: + with open(filename, "w") as f: + f.write(f"num_batches = {self.num_batches}\n" + f"num_rows = {self.num_rows}\n" + f"num_bytes = {self.num_bytes}\n" + ) + + def process(self, batch_source): + for batch in batch_source: + self.num_batches += 1 + self.num_rows += batch.num_rows + self.num_bytes += batch.nbytes + +class BatchStreamParquetFileWriter: + def __init__(self, filename): + self.filename = filename + self.pqwriter = None + + def close(self): + if self.pqwriter is not None: + self.pqwriter.close() + + def process(self, batch_source): + for batch in batch_source: + table = pa.Table.from_batches([batch], schema=batch.schema) + if self.pqwriter is None: + self.pqwriter = pq.ParquetWriter(self.filename, table.schema) + self.pqwriter.write_table(table) + +class BatchStreamFeatherFileWriter: + def __init__(self, filename): + self.filename = filename + self.file = None + self.ipcwriter = None + + def close(self): + if self.ipcwriter is not None: + self.ipcwriter.close() + if self.file is not None: + self.file.close() + + def process(self, batch_source): + if self.file is None: + self.file = open(self.filename, "wb") + for batch in batch_source: + if self.ipcwriter is None: + self.ipcwriter = ipc.new_file(self.file, batch.schema) + self.ipcwriter.write_batch(batch) + +class BatchStreamRbsWriter: + def __init__(self, filename): + self.filename = filename + self.file = None + self.rbswriter = None + + def close(self): + if self.rbswriter is not None: + self.rbswriter.close() + if self.file is not None: + self.file.close() + + def process(self, batch_source): + if self.file is None: + self.file = open(self.filename, "wb") + for batch in batch_source: + if self.rbswriter is None: + self.rbswriter = RecordBatchStreamWriter(self.file, batch.schema) + self.rbswriter.write_batch(batch) Review Comment: `pyarrow.dataset.write_dataset` accepts an iterable of record batches as its input. Each of these classes could probably be replaced by a single call to `write_dataset`. ########## cpp/src/arrow/compute/exec/asof_join_benchmark.cc: ########## @@ -0,0 +1,456 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <boost/process.hpp> +#include <string> + +#include "benchmark/benchmark.h" + +#include "arrow/compute/exec/test_util.h" +#include "arrow/csv/writer.h" +#include "arrow/dataset/file_parquet.h" +#include "arrow/filesystem/api.h" +#include "arrow/ipc/api.h" +#include "arrow/table.h" +#include "arrow/testing/future_util.h" + +namespace arrow { +namespace compute { + +static const char* time_col = "time"; +static const char* key_col = "id"; +static bool createdBenchmarkFiles = false; +static std::shared_ptr<arrow::internal::TemporaryDir> temp_dir = +arrow::internal::TemporaryDir::Make("asof_benchmarks").ValueOrDie(); + +struct ReaderNodeTableProperties { + ExecNode* execNode; + size_t total_rows; + size_t total_bytes; +}; + +// requires export PYTHONPATH=/path/to/benchmark_scripts/table_generation +// calls generate_benchmark_files to create tables (feather files) varying in frequency, +// width, key density for benchmarks. places generated files in benchmark/data. This +// operation runs once at the beginning of benchmarking. +static void DoSetup() { + if (!createdBenchmarkFiles) { + std::error_code err; + std::ostringstream call_stream; + call_stream << "python3 -m generate_benchmark_files " << temp_dir.get()->path().ToString(); + boost::process::system(call_stream.str(), err); + if (err) { + std::cerr << "Could not generate python files." << std::endl; + std::cerr << "Error Message: " << err.message() << std::endl; + } + createdBenchmarkFiles = true; + } +} + +static std::vector<std::string> generateRightHandTables(std::string freq, int width_index, + int num_tables, + int num_ids_index) { + auto const generate_file_name = [](std::string freq, std::string is_wide, + std::string num_ids, std::string num) { + return freq + "_" + is_wide + "_" + num_ids + num + ".feather"; + }; + + std::string width_table[] = {"20_cols", "100_cols", "500_cols"}; + std::string num_ids_table[] = {"100_ids", "5000_ids", "10000_ids"}; + + std::vector<std::string> right_hand_tables; + right_hand_tables.reserve(num_tables); + + for (int j = 1; j <= num_tables; j++) { + right_hand_tables.push_back(generate_file_name( + freq, width_table[width_index], num_ids_table[num_ids_index], std::to_string(j))); + } + return right_hand_tables; +} + +// Wrapper to enable the use of RecordBatchFileReaders as RecordBatchReaders +class RecordBatchFileReaderWrapper : public arrow::ipc::RecordBatchReader { + std::shared_ptr<arrow::ipc::RecordBatchFileReader> _reader; + int _next; + public: + virtual ~RecordBatchFileReaderWrapper() {} + explicit RecordBatchFileReaderWrapper( + std::shared_ptr<arrow::ipc::RecordBatchFileReader> reader) + : _reader(reader), _next(0) {} + virtual arrow::Status ReadNext(std::shared_ptr<arrow::RecordBatch>* batch) { + // cout << "ReadNext _next=" << _next << "\n"; + if (_next < _reader->num_record_batches()) { + ARROW_ASSIGN_OR_RAISE(*batch, _reader->ReadRecordBatch(_next++)); + // cout << "\t --> " << (*batch)->num_rows() << "\n"; + } else { + batch->reset(); + // cout << "\t --> EOF\n"; + } + return arrow::Status::OK(); + } + virtual std::shared_ptr<arrow::Schema> schema() const { return _reader->schema(); } +}; + +static ReaderNodeTableProperties +make_arrow_ipc_reader_node(std::shared_ptr<arrow::compute::ExecPlan>& plan, + std::shared_ptr<arrow::fs::FileSystem>& fs, + const std::string& filename, + int64_t batch_size) { + // TODO: error checking + std::shared_ptr<arrow::io::RandomAccessFile> input = *fs->OpenInputFile(filename); + std::shared_ptr<arrow::ipc::RecordBatchFileReader> in_reader = + *arrow::ipc::RecordBatchFileReader::Open(input); + std::shared_ptr<RecordBatchFileReaderWrapper> reader( + new RecordBatchFileReaderWrapper(in_reader)); + auto schema = reader->schema(); + // we assume there is a time field represented in uint64, a key field of int32, and the + // remaining fields are float64. + size_t row_size = + sizeof(_Float64) * (schema->num_fields() - 2) + sizeof(int64_t) + sizeof(int32_t); + auto batch_gen = *arrow::compute::MakeReaderGenerator( + std::move(reader), plan->exec_context()->executor()); + int64_t rows = in_reader->CountRows().ValueOrDie(); + // cout << "create source("<<filename<<")\n"; + return {*arrow::compute::MakeExecNode( + "source", // registered type + plan.get(), // execution plan + {}, // inputs + arrow::compute::SourceNodeOptions( + std::make_shared<arrow::Schema>(*schema), // options, ) + batch_gen)), + rows, row_size * rows}; +} + +static ReaderNodeTableProperties make_table_source_node( + std::shared_ptr<arrow::compute::ExecPlan>& plan, + std::shared_ptr<arrow::fs::FileSystem>& fs, const std::string& filename, + int64_t batch_size) { + // TODO: error checking + std::shared_ptr<arrow::io::RandomAccessFile> input = *fs->OpenInputFile(filename); + std::shared_ptr<arrow::ipc::RecordBatchFileReader> in_reader = + *arrow::ipc::RecordBatchFileReader::Open(input); + std::vector<std::shared_ptr<RecordBatch>> record_batches; + for (int i = 0; i < in_reader.get()->num_record_batches(); i++) { + record_batches.push_back(in_reader.get()->ReadRecordBatch(i).ValueOrDie()); + } + std::shared_ptr<Table> table = Table::FromRecordBatches(record_batches).ValueOrDie(); + // we assume there is a time field represented in uint64, a key field of int32, and the + // remaining fields are float64. + size_t row_size = sizeof(_Float64) * (in_reader->schema()->num_fields() - 2) + Review Comment: ```suggestion size_t row_size = sizeof(double) * (in_reader->schema()->num_fields() - 2) + ``` ########## cpp/src/arrow/compute/exec/benchmark_scripts/table_generation/datagen.py: ########## @@ -0,0 +1,868 @@ +import copy +from datetime import date, datetime, timedelta +from pandas.tseries.frequencies import to_offset +import pyarrow as pa +import numpy as np + +_DEFAULT_BATCH_SIZE = 100 + +_DTYPE_IND = np.int32 +_DTYPE_ID = np.int32 +# strings in detail-arrays should be no longer than 300 +_DTYPE_STRING = np.dtype("<U300") + +_DEFAULT_NUM_IDS = 5000 +_DEFAULT_NUM_FEATURE_CATEGORIES = 100 + +# the arrays below provide mock details +# an array shorter than the number of ids is used in round-robin fashion +_DEFAULT_DETAIL_EXCHANGE = np.array( + ["N", "O"], dtype=_DTYPE_STRING) +_DEFAULT_DETAIL_NAME = np.array( + ["American Electric Power", "Canada Electric Power"], dtype=_DTYPE_STRING) +_DEFAULT_DETAIL_COUNTRY_NAME = np.array( + ["United States", "Canada"], dtype=_DTYPE_STRING) +_DEFAULT_DETAIL_SHORT_NAME = np.array( + ["A", "B"], dtype=_DTYPE_STRING) +_DEFAULT_DETAIL_CURRENCY_ID = np.array([10203040, 50607080], dtype=_DTYPE_ID) +_DEFAULT_DETAIL_CODE = np.array([10203040, 50607080], dtype=_DTYPE_ID) +_DEFAULT_DETAIL_COUNTRY_ID = np.array([10203040, 50607080], dtype=_DTYPE_ID) +_DEFAULT_DETAIL_TYPE_ID = np.array([10203040, 50607080], dtype=_DTYPE_ID) + + +def _arrow_type(dtype): + if dtype == np.int8: + return pa.int8() + elif dtype == np.int16: + return pa.int16() + elif dtype == np.int32: + return pa.int32() + elif dtype == np.int64: + return pa.int64() + elif dtype == np.uint8: + return pa.uint8() + elif dtype == np.uint16: + return pa.uint16() + elif dtype == np.uint32: + return pa.uint32() + elif dtype == np.uint64: + return pa.uint64() + elif dtype == np.float16: + return pa.float16() + elif dtype == np.float32: + return pa.float32() + elif dtype == np.float64: + return pa.float64() + elif dtype == _DTYPE_STRING: + return pa.string() + else: + raise RuntimeError("unknown dtype %r" % dtype) + +def _doc(*args, **kwargs): + def _doc_wrap(f): + f.__doc__ = f.__doc__.format(*args, **kwargs) + return f + return _doc_wrap + +def _max_freq(freqs): + offsets = [to_offset(freq) for freq in freqs] + return freqs[offsets.index(max(offsets))] + +class SimpleMidnightCalendar: + """ + A simple midnight calendar. All days are business days, and business hours + are midnight only, irrespective of holidays or time zone. + """ + + def is_business_day(self, curr_date): + """ + Returns True + """ + return True + + def business_begin_ns(self, curr_date): + """ + Returns midnight + """ + return 0 + + def business_end_ns(self, curr_date): + """ + Returns midnight + """ + return 0 + +class SimpleBusinessCalendar: + """ + A simple business calendar. Business days are Monday through Friday, and business + hours are 9:30 to 14:00, irrespective of holidays or time zone. + """ + + def is_business_day(self, curr_date): + """ + Returns True for a date on Monday through Friday + """ + return curr_date.weekday() < 5 + + def business_begin_ns(self, curr_date): + """ + Returns nanoseconds-since-midnight at 9:30 + """ + return 9 * 3600 * 1000000000 + + def trading_end_ns(self, curr_date): + """ + Returns nanoseconds-since-midnight at 16:00 + """ + return 16 * 3600 * 1000000000 + +class SimpleDailyBusinessCalendar: + """ + A simple daily business calendar. Business days are Monday through Friday, and + business hours are midnight only, irrespective of holidays or time zone. + """ + + def is_business_day(self, curr_date): + """ + Returns True for a date on Monday through Friday + """ + return curr_date.weekday() < 5 + + def business_begin_ns(self, curr_date): + """ + Returns midnight + """ + return 0 + + def trading_end_ns(self, curr_date): + """ + Returns midnight + """ + return 0 + +class ArrayDistribution: + def __init__(self, dtype): + self.dtype = dtype + + def __call__(self, rng, block_size, num_blocks): + raise NotImplementedError + +class ArrayAllDistribution(ArrayDistribution): + """ + Samples in order all elements of a given array. + """ + def __init__(self, arr): + super().__init__(arr.dtype) + self._arr = arr + + def __call__(self, rng, block_size, num_blocks): + ind = np.arange(block_size, dtype=_DTYPE_ID) + return self._arr[np.tile(ind % len(self._arr), num_blocks)] + +class ArrayUniformDistribution(ArrayDistribution): + """ + Samples uniformly the elements of a given array. + """ + def __init__(self, arr): + super().__init__(arr.dtype) + self._arr = arr + + def __call__(self, rng, block_size, num_blocks): + ind = rng.integers(low=0, high=len(self._arr), + size=block_size*num_blocks, dtype=_DTYPE_IND) + return self._arr[ind] + +class DriftOptions: + drift_blocks: int = 1 # number of blocks between drift chances + prob_drift: float = 1 # probability of each drift chance + prob_add: float = 0.1 # probability of adding an item on drift cycle + prob_del: float = 0.1 # probability of deleting an item on drift cycle + frac_init: float = 0.8 # initial fraction of items inside + frac_min: float = 0.05 # minimum fraction of items inside + frac_max: float = 1.0 # maximum fraction of items inside + +class ArrayUniformDriftDistribution(ArrayDistribution): + """ + Samples uniformly from a drifting set of elements of a given array. + + The sampler works in units of blocks, defined by the block_size parameter + of the call operator, and is controlled by DriftOptions. For a fixed seed, + it generates a fixed sequence of blocks, regardless of how many blocks are + generated in each invocation of the call operator. + + The sampler maintains a subset of elements of the array from which to sample + a block. This subset is initially a fraction DriftOptions.frac_init of the + elements of the array. After each block, there is a probability + DriftOptions.prob_drift that a next subset of elements is created by adding + elements with probability DriftOptions.prob_add and deleting elements with + probability DriftOptions.prob_del subject to this subset being a fraction + between DriftOptions.frac_min and DriftOptions.frac_max of the elements of + the array. The set of elements used for sampling is updated only each + DriftOptions.drift_blocks blocks to the union of these next sets of elements + created over these blocks. Thus, two samplers that differ only in their + DriftOptions.drift_blocks setting will sample from synchronized sets of + elements. For example, one sampler generates a block every hour using a set + of elements associated with one hour while another generates every 24 hours + using the union of the sets of elements associated with each of 24 hours. + """ + def __init__(self, arr, drift_opts = None): + super().__init__(arr.dtype) + self._arr = arr + self._drift_opts = copy.deepcopy(drift_opts) or DriftOptions() + # countdown in blocks until next drift chance + self._driftb = self._drift_opts.drift_blocks + # constant range of indices + self._ind = np.arange(len(arr), dtype=_DTYPE_IND) + # permutation of indices into self._arr + self._ptr = np.arange(len(arr), dtype=_DTYPE_IND) + # temporary buffer + self._buf = np.empty_like(arr) + # selects inside-indices of self._ind + self._flag = np.empty_like(arr, dtype=bool) + # selects inside-indices of self._ind + self._draw = np.empty_like(arr, dtype=bool) + # probability of item to be added or deleted + self._prob = np.empty_like(arr, dtype=np.float64) + # inside/outside part below/at-or-above self._n + self._n = int(len(arr) * self._drift_opts.frac_init) + # minimum allowed value of n + self._n0 = int(len(arr) * self._drift_opts.frac_min) + # maximum allowed value of n + self._n1 = int(len(arr) * self._drift_opts.frac_max) + # initially, select inside-indices as range(n) + self._flag[:] = False + self._draw[self._ptr[:self._n]] = True + self._draw[self._ptr[self._n:]] = False + + def __call__(self, rng, block_size, num_blocks): + # prepare result indices into self._arr + result_size = block_size * num_blocks + result = np.empty((result_size,), dtype=_DTYPE_IND) + # one block at a time, so two sequences of calls with fixed block_size + # produce the same draws from rng regardless of how num_blocks varies + for i in range(0, result_size, block_size): + result[i:i+block_size] = self._do_draw(rng, block_size) + # drift not yet for drawing + if rng.random() < self._drift_opts.prob_drift: + self._do_drift(rng) + self._flag[self._ptr[:self._n]] = True + # countdown to next drift chance + self._driftb -= 1 + if self._driftb == 0: + self._driftb = self._drift_opts.drift_blocks + # drift for drawing + self._draw[:] = self._flag[:] + # reselect inside-indices + self._flag[:] = False + return self._arr[result] + + def _do_draw(self, rng, block_size): + # uniform draw from the set of selected indices into current block + ind1 = self._ind[self._draw] + ind2 = rng.integers(low=0, high=len(ind1), + size=block_size, dtype=_DTYPE_IND) + return ind1[ind2] + + def _do_drift(self, rng): + n = self._n + # shuffle inside and outside parts + rng.shuffle(self._ptr[:n]) + rng.shuffle(self._ptr[n:]) + # repartition + rng.random(size=len(self._prob), out=self._prob) + n_add = sum(self._prob[n:] < self._drift_opts.prob_add) + n_del = sum(self._prob[:n] < self._drift_opts.prob_del) + n_adj = n + n_add - n_del + n_add -= max(0, n_adj - self._n1) + n_del -= max(0, self._n0 - n_adj) + # update partition + na, nb, nn = n - n_del, n + n_add, n + n_add - n_del + self._buf[na:nb] = self._ptr[na:nb] # save del and add parts + self._ptr[na:nn] = self._buf[n:nb] # place add part inside + self._ptr[nn:nb] = self._buf[na:n] # place del part outside + self._n = nn + + +class NormalDistribution(ArrayDistribution): + """ + Samples from a normal distribution. + """ + def __init__(self, loc=0.0, scale=1.0): + super().__init__(np.float64) + self.loc = loc + self.scale = scale + + def __call__(self, rng, block_size, num_blocks): + return rng.normal( + loc=self.loc, scale=self.scale, size=block_size*num_blocks) + +class SquaredNormalDistribution(ArrayDistribution): + """ + Samples from a squared normal distribution. + """ + def __init__(self, loc=0.0, scale=1.0): + super().__init__(np.float64) + self.loc = loc + self.scale = scale + + def __call__(self, rng, block_size, num_blocks): + x = rng.normal( + loc=self.loc, scale=self.scale, size=block_size*num_blocks) + return x * x + +def _get_array(ids): + if isinstance(ids, np.ndarray): + return ids + elif type(ids) in [type(0), type([])]: + return np.arange(ids, dtype=_DTYPE_ID) + raise ValueError("invalid ids " + str(ids)) + +def _get_dist(dist): + if type(dist) == type(""): + if dist == "all": + return ArrayAllDistribution + elif dist == "uniform": + return ArrayUniformDistribution + elif dist == "uniform-drift": + return ArrayUniformDriftDistribution + elif isinstance(dist, Distribution): + return dist + raise ValueError("invalid dist " + str(dist)) + +def _get_id_dist(ids=_DEFAULT_NUM_IDS, dist="all"): + return _get_dist(dist)(_get_array(ids)) + +def _get_category_id_dist(ids=_DEFAULT_NUM_FEATURE_CATEGORIES, dist="all"): + return _get_dist(dist)(_get_array(ids)) + +class DataGenerator: + """ + A data generator. See the generate(...) method for details. + """ + + def _normalize_date(self, date_value, default_date): + """ + Normalizes to a date. If the date value is a string, it is parsed in ISO + format and returned. If it is already a date, it is returned as is. If + it is None, it defaults to the given default value. + """ + if isinstance(date_value, str): + return date.fromisoformat(date_value) + elif isinstance(date_value, date): + return date_value + elif date_value is None: + return default_date + else: + raise ValueError("invalid date: " + str(date_value)) + + def _date_to_midnight_ns(self, curr_date): + """ + Returns a datetime at midnight on the given date. + """ + midnight = datetime.combine(curr_date, datetime.min.time()).timestamp() + return int(midnight * 1000000000) + + def generate(self, begin_date=None, end_date=None, seed=None, calendar=None, + freq="200ms", time_dist="constant", dists=None, num_ids=1, + batch_size=_DEFAULT_BATCH_SIZE): + """ + Generates market data using these paremeters: + + begin_date the date on which to begin generating. + Defaults to the minimum date. + end_date the (inclusive) date on which to end generating. + Defaults to the maximum date. + seed the seed to use for pseudorandom generation. + Defaults to a random seed. + calendar the calendar defining trading dates and hours. + Defaults to SimpleMidnightCalendar(). + freq the time frequency of periods to generate with. + Defaults to "200ms". + time_dist the distribution of times within a period. + Defaults to "constant". Also accepts "uniform". + dists list of (name, distribution) tuples. + num_ids the number of ids to generate per period. + batch_size the maximum number of time points to generate per batch. + Defaults to 100. Taken as at least 1. + + Returns an iterator of RecordBatch instances, each with a schema like: + + "time" timestamp + name0 dtype0 + ... + + The BatchRecord may have fewer time points than requested in batch_size + when the date range or business date ends. + + Example: + +>>> dgen = DataGenerator() +>>> for batch in dgen.generate("2020-01-01", "2020-01-01", seed=1, +... freq="10h", +... dists=[ +... ("id", ArrayAllDistribution(np.arange(3, dtype=np.int32))), +... ("value", NormalDistribution()) +... ], +... num_ids=3, batch_size=2): +... for item in batch.to_pydict().items(): +... print(item) +... +('time', [Timestamp('2020-01-01 05:00:00+0000', tz='UTC'), Timestamp('2020-01-01 05:00:00+0000', tz='UTC'), Timestamp('2020-01-01 05:00:00+0000', tz='UTC')]) +('id', [0, 1, 2]) +('value', [0.345584192064786, 0.8216181435011584, 0.33043707618338714]) + """ + + begin_date = self._normalize_date(begin_date, date.min) + end_date = self._normalize_date(end_date, date.max) + if calendar is None: + calendar = SimpleMidnightCalendar() + offset_ns = to_offset(freq).nanos + if time_dist not in ["constant", "uniform"]: + raise ValueError("invalid time_dist: " + str(time_dist)) + if dists is None: + dists = [] + batch_size = max(1, batch_size) + + schema = pa.schema( + [("time", pa.timestamp('ns', 'UTC'))] + + [(name, _arrow_type(dist.dtype)) for name, dist in dists] + ) + + uniform_time_dist = (time_dist == "uniform") + rng = np.random.default_rng(seed=seed) + one_day = timedelta(days=1) + curr_date = begin_date + curr_date_ns = self._date_to_midnight_ns(curr_date) + while True: + if calendar.is_business_day(curr_date): + begin_time_ns = ( + curr_date_ns + calendar.business_begin_ns(curr_date)) + end_time_ns = ( + curr_date_ns + calendar.trading_end_ns(curr_date)) + curr_offset_ns = min(offset_ns, end_time_ns - begin_time_ns + 1) + while begin_time_ns <= end_time_ns: + times_num = 1 + ( + (end_time_ns - begin_time_ns) // curr_offset_ns) + batch_num = min(times_num, batch_size) + curr_array_size = (num_ids * batch_num,) + batch_end_time_ns = ( + begin_time_ns + (batch_num - 1) * curr_offset_ns) + + time_ns_value_array = np.arange( + begin_time_ns, batch_end_time_ns + 1, + curr_offset_ns, dtype=np.uint64) + time_ns_array = np.repeat( + time_ns_value_array, num_ids).astype('datetime64[ns]') + if uniform_time_dist: + dist_offset_ns = min(curr_offset_ns, + end_time_ns - begin_time_ns) + time_ns_array += rng.integers(low=0, + high=dist_offset_ns, size=curr_array_size, + dtype=np.uint64) + time_ns_array = np.sort(time_ns_array) + arrays = ([time_ns_array] + + [dist(rng, num_ids, batch_num) for _, dist in dists]) + parrays = [pa.array(a) for a in arrays] + + batch = pa.RecordBatch.from_arrays(parrays, schema=schema) + yield batch + + begin_time_ns += batch_num * curr_offset_ns + if curr_date == end_date: + break + curr_date += one_day + curr_date_ns = self._date_to_midnight_ns(curr_date) + +class TableMDGenerator: Review Comment: Why are there so many TableXyzGenerator types? What is common / different between them all? ########## cpp/src/arrow/compute/exec/benchmark_scripts/table_generation/batch_process.py: ########## @@ -0,0 +1,104 @@ +import json +from datagen import * + +from pyarrow import ipc +from pyarrow.ipc import RecordBatchStreamWriter +import pyarrow as pa +import pyarrow.parquet as pq + + +class BatchStreamStats: Review Comment: Is this used? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
