rtpsw commented on PR #13880:
URL: https://github.com/apache/arrow/pull/13880#issuecomment-1235474550
For posterity, I also made an attempt to improve performance by refactoring
`AdvanceAndMemoize` for the different latest-key paths, but this resulted in
slightly decreased performance yet.
```
$ archery benchmark diff ARROW-17412.template-memoize.json baseline.json
INFO:numexpr.utils:NumExpr defaulting to 8 threads.

Non-regressions: (1)

benchmark
baseline contender change %
counters
AsOfJoinOverhead/left_freq:400/left_cols:20/left_ids:500/left_batch_size:32000/num_right_tables:1/right_freq:400/right_cols:20/right_ids:500/real_time
142.364 MiB/sec 166.963 MiB/sec 17.279 {'family_index': 0,
'per_family_instance_index': 14, 'run_name':
'AsOfJoinOverhead/left_freq:400/left_cols:20/left_ids:500/left_batch_size:32000/num_right_tables:1/right_freq:400/right_cols:20/right_ids:500/real_time',
'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 7,
'input_rows_per_second': 867904.5056496062, 'maximum_peak_memory': 686301504.0}

Regressions: (10)

benchmark
baseline contender change %
counters
AsOfJoinOverhead/left_freq:400/left_cols:20/left_ids:500/left_batch_size:4000/num_right_tables:10/right_freq:400/right_cols:20/right_ids:500/real_time
344.387 MiB/sec 300.780 MiB/sec -12.662 {'family_index': 0,
'per_family_instance_index': 10, 'run_name':
'AsOfJoinOverhead/left_freq:400/left_cols:20/left_ids:500/left_batch_size:4000/num_right_tables:10/right_freq:400/right_cols:20/right_ids:500/real_time',
'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 3,
'input_rows_per_second': 2099512.4178041373, 'maximum_peak_memory': 148407104.0}
AsOfJoinOverhead/left_freq:400/left_cols:20/left_ids:500/left_batch_size:1000/num_right_tables:1/right_freq:400/right_cols:20/right_ids:500/real_time
315.636 MiB/sec 269.865 MiB/sec -14.501 {'family_index': 0,
'per_family_instance_index': 12, 'run_name':
'AsOfJoinOverhead/left_freq:400/left_cols:20/left_ids:500/left_batch_size:1000/num_right_tables:1/right_freq:400/right_cols:20/right_ids:500/real_time',
'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 16,
'input_rows_per_second': 1924236.4216933611, 'maximum_peak_memory': 686301504.0}
AsOfJoinOverhead/left_freq:400/left_cols:20/left_ids:500/left_batch_size:4000/num_right_tables:50/right_freq:400/right_cols:20/right_ids:500/real_time
323.553 MiB/sec 275.253 MiB/sec -14.928 {'family_index': 0,
'per_family_instance_index': 11, 'run_name':
'AsOfJoinOverhead/left_freq:400/left_cols:20/left_ids:500/left_batch_size:4000/num_right_tables:50/right_freq:400/right_cols:20/right_ids:500/real_time',
'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 1,
'input_rows_per_second': 1972499.868090142, 'maximum_peak_memory': 686301504.0}
AsOfJoinOverhead/left_freq:400/left_cols:100/left_ids:500/left_batch_size:4000/num_right_tables:1/right_freq:400/right_cols:100/right_ids:500/real_time
404.655 MiB/sec 336.061 MiB/sec -16.951 {'family_index': 0,
'per_family_instance_index': 5, 'run_name':
'AsOfJoinOverhead/left_freq:400/left_cols:100/left_ids:500/left_batch_size:4000/num_right_tables:1/right_freq:400/right_cols:100/right_ids:500/real_time',
'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 4,
'input_rows_per_second': 522550.5184384509, 'maximum_peak_memory': 131071104.0}
AsOfJoinOverhead/left_freq:200/left_cols:20/left_ids:500/left_batch_size:4000/num_right_tables:1/right_freq:200/right_cols:20/right_ids:500/real_time
355.718 MiB/sec 293.828 MiB/sec -17.399 {'family_index': 0,
'per_family_instance_index': 0, 'run_name':
'AsOfJoinOverhead/left_freq:200/left_cols:20/left_ids:500/left_batch_size:4000/num_right_tables:1/right_freq:200/right_cols:20/right_ids:500/real_time',
'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 8,
'input_rows_per_second': 2168591.3894433207, 'maximum_peak_memory': 54420864.0}
AsOfJoinOverhead/left_freq:400/left_cols:20/left_ids:100/left_batch_size:4000/num_right_tables:1/right_freq:400/right_cols:20/right_ids:100/real_time
339.172 MiB/sec 277.452 MiB/sec -18.197 {'family_index': 0,
'per_family_instance_index': 6, 'run_name':
'AsOfJoinOverhead/left_freq:400/left_cols:20/left_ids:100/left_batch_size:4000/num_right_tables:1/right_freq:400/right_cols:20/right_ids:100/real_time',
'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 84,
'input_rows_per_second': 2067719.28348546, 'maximum_peak_memory': 131071104.0}
AsOfJoinOverhead/left_freq:400/left_cols:20/left_ids:1000/left_batch_size:4000/num_right_tables:1/right_freq:400/right_cols:20/right_ids:1000/real_time
351.580 MiB/sec 285.147 MiB/sec -18.896 {'family_index': 0,
'per_family_instance_index': 8, 'run_name':
'AsOfJoinOverhead/left_freq:400/left_cols:20/left_ids:1000/left_batch_size:4000/num_right_tables:1/right_freq:400/right_cols:20/right_ids:1000/real_time',
'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 9,
'input_rows_per_second': 2143364.0434252876, 'maximum_peak_memory': 131071104.0}
AsOfJoinOverhead/left_freq:400/left_cols:20/left_ids:500/left_batch_size:4000/num_right_tables:1/right_freq:400/right_cols:20/right_ids:500/real_time
349.482 MiB/sec 282.849 MiB/sec -19.066 {'family_index': 0,
'per_family_instance_index': 9, 'run_name':
'AsOfJoinOverhead/left_freq:400/left_cols:20/left_ids:500/left_batch_size:4000/num_right_tables:1/right_freq:400/right_cols:20/right_ids:500/real_time',
'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 19,
'input_rows_per_second': 2111813.458550057, 'maximum_peak_memory': 131071104.0}
AsOfJoinOverhead/left_freq:1000/left_cols:20/left_ids:500/left_batch_size:4000/num_right_tables:1/right_freq:1000/right_cols:20/right_ids:500/real_time
348.038 MiB/sec 278.493 MiB/sec -19.982 {'family_index': 0,
'per_family_instance_index': 2, 'run_name':
'AsOfJoinOverhead/left_freq:1000/left_cols:20/left_ids:500/left_batch_size:4000/num_right_tables:1/right_freq:1000/right_cols:20/right_ids:500/real_time',
'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 44,
'input_rows_per_second': 2121769.8288677963, 'maximum_peak_memory': 54420864.0}
AsOfJoinOverhead/left_freq:400/left_cols:10/left_ids:500/left_batch_size:4000/num_right_tables:1/right_freq:400/right_cols:10/right_ids:500/real_time
305.968 MiB/sec 223.128 MiB/sec -27.075 {'family_index': 0,
'per_family_instance_index': 3, 'run_name':
'AsOfJoinOverhead/left_freq:400/left_cols:10/left_ids:500/left_batch_size:4000/num_right_tables:1/right_freq:400/right_cols:10/right_ids:500/real_time',
'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 29,
'input_rows_per_second': 3487291.181883503, 'maximum_peak_memory': 54420864.0}
```
The code diff for this benchmark is:
```
diff --git a/cpp/src/arrow/compute/exec/asof_join_node.cc
b/cpp/src/arrow/compute/exec/asof_join_node.cc
index d4e704ba6..eb13ba5a7 100644
--- a/cpp/src/arrow/compute/exec/asof_join_node.cc
+++ b/cpp/src/arrow/compute/exec/asof_join_node.cc
@@ -302,6 +302,10 @@ class InputState {
if (key_col_index_.size() == 0) {
return 0;
}
+ return GetLatestKeyFastPath(batch, row);
+ }
+
+ inline ByType GetLatestKeyFastPath(const RecordBatch* batch, row_index_t
row) const {
auto data = batch->column_data(key_col_index_[0]);
switch (key_type_id_[0]) {
LATEST_VAL_CASE(INT8, key_value)
@@ -382,7 +386,8 @@ class InputState {
// latest_time and latest_ref_row to the value that immediately pass the
// specified timestamp.
// Returns true if updates were made, false if not.
- Result<bool> AdvanceAndMemoize(OnType ts) {
+ template <typename LatestKey>
+ Result<bool> AdvanceAndMemoize(OnType ts, LatestKey get_latest_key) {
// Advance the right side row index until we reach the latest right row
(for each key)
// for the given left timestamp.
@@ -405,14 +410,33 @@ class InputState {
must_hash_ = true;
may_rehash_ = false;
Rehash();
+ break;
}
- memo_.Store(rb, latest_ref_row_, latest_time, GetLatestKey());
+ memo_.Store(rb, latest_ref_row_, latest_time, get_latest_key());
updated = true;
ARROW_ASSIGN_OR_RAISE(advanced, Advance());
} while (advanced);
return updated;
}
+ Result<bool> AdvanceAndMemoize(OnType ts) {
+ if (key_col_index_.size() == 0) {
+ return AdvanceAndMemoize(ts, []() { return 0; });
+ }
+ if (must_hash_) {
+ return AdvanceAndMemoize(ts, [this]() { return
key_hasher_->HashesFor(queue_.UnsyncFront().get())[latest_ref_row_]; });
+ }
+ ARROW_ASSIGN_OR_RAISE(bool updated,
+ AdvanceAndMemoize(ts, [this]() { return
GetLatestKeyFastPath(queue_.UnsyncFront().get(), latest_ref_row_); }));
+ if (must_hash_) {
+ ARROW_ASSIGN_OR_RAISE(
+ bool updated_after_rehash,
+ AdvanceAndMemoize(ts, [this]() { return
key_hasher_->HashesFor(queue_.UnsyncFront().get())[latest_ref_row_]; }));
+ return updated || updated_after_rehash;
+ }
+ return updated;
+ }
+
void Rehash() {
MemoStore new_memo;
for (const auto& entry : memo_.entries_) {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]