This is an automated email from the ASF dual-hosted git repository.

raulcd pushed a commit to branch maint-13.0.0
in repository https://gitbox.apache.org/repos/asf/arrow.git

commit f5a4f12ffb9b77a4a89138a1fb81d63cc7a32884
Author: rtpsw <[email protected]>
AuthorDate: Wed Jul 12 22:58:30 2023 +0300

    GH-36482: [C++][CI] Fix sporadic test failures in AsofJoinBasicTest (#36499)
    
    ### What changes are included in this PR?
    
    The key hasher is invalidated before the first invocation of `GetKey` (via 
`GetLatestKey`) after a new batch arrives. In the pre-PR code, this 
invalidation happens within `Advance`, which is called from `AdvanceAndMemoize` 
only after `GetLatestKey` is called. The change adds synchronization between 
the input-receiving- and processing- threads, because avoiding that would 
require a more complicated and brittle change, e.g., one that involves 
detecting in the processing thread when a ne [...]
    
    ### Are these changes tested?
    
    Yes, by existing tests.
    
    ### Are there any user-facing changes?
    
    No.
    
    **This PR contains a "Critical Fix".**
    * Closes: #36482
    
    Authored-by: Yaron Gvili <[email protected]>
    Signed-off-by: Weston Pace <[email protected]>
---
 cpp/src/arrow/acero/asof_join_node.cc | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/cpp/src/arrow/acero/asof_join_node.cc 
b/cpp/src/arrow/acero/asof_join_node.cc
index 98e5918ebb..b7f5d878e5 100644
--- a/cpp/src/arrow/acero/asof_join_node.cc
+++ b/cpp/src/arrow/acero/asof_join_node.cc
@@ -524,7 +524,7 @@ class KeyHasher {
   size_t index_;
   std::vector<col_index_t> indices_;
   std::vector<KeyColumnMetadata> metadata_;
-  const RecordBatch* batch_;
+  std::atomic<const RecordBatch*> batch_;
   std::vector<HashType> hashes_;
   LightContext ctx_;
   std::vector<KeyColumnArray> column_arrays_;
@@ -819,7 +819,6 @@ class InputState {
         have_active_batch &= !queue_.TryPop();
         if (have_active_batch) {
           DCHECK_GT(queue_.UnsyncFront()->num_rows(), 0);  // empty batches 
disallowed
-          key_hasher_->Invalidate();  // batch changed - invalidate key 
hasher's cache
           memo_.UpdateTime(GetTime(queue_.UnsyncFront().get(), 0));  // time 
changed
         }
       }
@@ -897,7 +896,8 @@ class InputState {
 
   Status Push(const std::shared_ptr<arrow::RecordBatch>& rb) {
     if (rb->num_rows() > 0) {
-      queue_.Push(rb);  // only after above updates - push batch for processing
+      key_hasher_->Invalidate();  // batch changed - invalidate key hasher's 
cache
+      queue_.Push(rb);            // only now push batch for processing
     } else {
       ++batches_processed_;  // don't enqueue empty batches, just record as 
processed
     }

Reply via email to