zanmato1984 opened a new issue, #45334:
URL: https://github.com/apache/arrow/issues/45334

   ### Describe the bug, including details regarding any error messages, 
version, and platform.
   
   After fixing #44513 , I was further exploring how far we can go in terms of 
overflow-safe. I enlarged the scale of the case in #44513 as followed:
   
   <details>
   
   ```
   TEST(HashJoin, LARGE_MEMORY_TEST(BuildSidePayloadOver4GB)) {
     const int64_t num_match_rows = 32;
     const int64_t num_rows_per_match_batch = 32;
     const int64_t num_match_batches = num_match_rows / 
num_rows_per_match_batch;
   
     const int64_t num_unmatch_rows_large = 720898048;
     const int64_t num_rows_per_unmatch_batch_large = 352001;
     const int64_t num_unmatch_batches_large =
         num_unmatch_rows_large / num_rows_per_unmatch_batch_large;
   
     auto schema_small = schema({field("small_key0", int64()), 
field("small_key1", int64()),
                                 field("small_key2", int64())});
     auto schema_large =
         schema({field("large_key0", int64()), field("large_key1", int64()),
                 field("large_key2", int64()), field("large_payload", 
int64())});
   
     const int64_t match_key0 = static_cast<int64_t>(88506230299);
     const int64_t match_key1 = static_cast<int64_t>(16556030299);
     const int64_t match_key2 = 11240299;
     const int64_t match_payload = 42;
   
     // Match arrays of length num_rows_per_match_batch.
     ASSERT_OK_AND_ASSIGN(
         auto match_key0_arr,
         Constant(MakeScalar(match_key0))->Generate(num_rows_per_match_batch));
     ASSERT_OK_AND_ASSIGN(
         auto match_key1_arr,
         Constant(MakeScalar(match_key1))->Generate(num_rows_per_match_batch));
     ASSERT_OK_AND_ASSIGN(
         auto match_key2_arr,
         Constant(MakeScalar(match_key2))->Generate(num_rows_per_match_batch));
     ASSERT_OK_AND_ASSIGN(
         auto match_payload_arr,
         
Constant(MakeScalar(match_payload))->Generate(num_rows_per_match_batch));
   
     // Small batch.
     ExecBatch batch_small({match_key0_arr, match_key1_arr, match_key2_arr},
                           num_rows_per_match_batch);
   
     // Large unmatch batch.
     const int64_t seed = 42;
     auto unmatch_key_arr_large = RandomArrayGenerator(seed).Int64(
         num_rows_per_unmatch_batch_large, /*min=*/match_key2 + 1,
         /*max=*/match_key2 + 1 + 8);
     ASSERT_OK_AND_ASSIGN(auto unmatch_payload_arr_large,
                          MakeArrayOfNull(int64(), 
num_rows_per_unmatch_batch_large));
     ExecBatch unmatch_batch_large({unmatch_key_arr_large, 
unmatch_key_arr_large,
                                    unmatch_key_arr_large, 
unmatch_payload_arr_large},
                                   num_rows_per_unmatch_batch_large);
     // Large match batch.
     ExecBatch match_batch_large(
         {match_key0_arr, match_key1_arr, match_key2_arr, match_payload_arr},
         num_rows_per_match_batch);
   
     // Batches with schemas.
     auto batches_small = BatchesWithSchema{
         std::vector<ExecBatch>(num_match_batches, batch_small), schema_small};
     auto batches_large = BatchesWithSchema{
         std::vector<ExecBatch>(num_unmatch_batches_large, unmatch_batch_large),
         schema_large};
     for (int i = 0; i < num_match_batches; i++) {
       batches_large.batches.push_back(match_batch_large);
     }
   
     Declaration source_small{
         "exec_batch_source",
         ExecBatchSourceNodeOptions(batches_small.schema, 
batches_small.batches)};
     Declaration source_large{
         "exec_batch_source",
         ExecBatchSourceNodeOptions(batches_large.schema, 
batches_large.batches)};
   
     HashJoinNodeOptions join_opts(
         JoinType::INNER,
         /*left_keys=*/{"small_key0", "small_key1", "small_key2"},
         /*right_keys=*/{"large_key0", "large_key1", "large_key2"});
     Declaration join{
         "hashjoin", {std::move(source_small), std::move(source_large)}, 
join_opts};
   
     // Join should emit num_match_rows * num_match_rows rows.
     ASSERT_OK_AND_ASSIGN(auto batches_result, 
DeclarationToExecBatches(std::move(join)));
     Declaration result{"exec_batch_source",
                        
ExecBatchSourceNodeOptions(std::move(batches_result.schema),
                                                   
std::move(batches_result.batches))};
     AssertRowCountEq(result, num_match_rows * num_match_rows);
   
     // The payload should all be match_payload.
     auto predicate = equal(field_ref("large_payload"), literal(match_payload));
     Declaration filter{"filter", {result}, 
FilterNodeOptions{std::move(predicate)}};
     AssertRowCountEq(std::move(filter), num_match_rows * num_match_rows);
   }
   ```
   </details>
   
   And three more overflow issues are identified:
   1. 
https://github.com/apache/arrow/blob/12f62653c825fbf305bfde61c112d2aa69203c62/cpp/src/arrow/acero/swiss_join_internal.h#L120
   2. 
https://github.com/apache/arrow/blob/12f62653c825fbf305bfde61c112d2aa69203c62/cpp/src/arrow/acero/swiss_join_internal.h#L150
   3. 
https://github.com/apache/arrow/blob/12f62653c825fbf305bfde61c112d2aa69203c62/cpp/src/arrow/acero/swiss_join_avx2.cc#L247
   
   All are using 32-bit multiplication and if the `row_id` is big enough (but 
still valid), e.g., `>= 0x20000000`, overflow happens.
   
   ### Component(s)
   
   C++


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to