pitrou commented on code in PR #38147:
URL: https://github.com/apache/arrow/pull/38147#discussion_r1356411588


##########
cpp/src/arrow/acero/hash_join_node_test.cc:
##########
@@ -2124,5 +2124,266 @@ TEST(HashJoin, ChainedIntegerHashJoins) {
   }
 }
 
+// This test case is related to GH-38147
+// To verify that the issue with offset handling has been fixed, the number of 
matching
+// records needs to be larger than the mini-batch size (1024).
+constexpr uint64_t NUM_MATCH_RECORDS = 1234;
+constexpr uint64_t NUM_LEFT_RECORDS = 2000;
+constexpr uint64_t NUM_RIGHT_RECORDS = 1500;
+
+std::string GenerateTimestamp() {
+  auto now = std::chrono::system_clock::now();
+  auto now_time_t = std::chrono::system_clock::to_time_t(now);
+  auto now_ms =
+      
std::chrono::duration_cast<std::chrono::milliseconds>(now.time_since_epoch()) %
+      1000;
+
+  std::stringstream ss;
+  ss << std::put_time(std::localtime(&now_time_t), "%Y-%m-%d %H:%M:%S");
+  ss << '.' << std::setw(6) << std::setfill('0') << now_ms.count();
+  return ss.str();
+}
+
+std::string GenerateRecord(
+    uint64_t index, const std::string& timestamp, bool useNegativeOne,
+    std::function<std::vector<std::string>(uint64_t)> getExtraValues) {
+  std::stringstream record;
+  record << R"([")" << timestamp << R"(",)";
+  if (useNegativeOne && index >= NUM_MATCH_RECORDS) {
+    record << "\"Not Matched\"";
+  } else {
+    record << "\"" << index + 1 << "\"";
+  }
+  auto extraValues = getExtraValues(index);
+  for (const auto& value : extraValues) {
+    record << R"(, ")" << value << R"(")";
+  }
+  record << R"(])";
+  return record.str();
+}
+
+std::string GenerateLeftRecords(
+    uint64_t recordCount, const std::string& timestamp,
+    std::function<std::vector<std::string>(uint64_t)> getExtraValues) {
+  std::stringstream ss;
+  ss << R"([)";
+  for (uint64_t i = 0; i < recordCount; ++i) {
+    if (i != 0) ss << ",";
+    ss << GenerateRecord(i, timestamp, true, getExtraValues);
+  }
+  ss << R"(])";
+  return ss.str();
+}
+
+std::string GenerateRightRecords(
+    uint64_t recordCount, const std::string& timestamp,
+    std::function<std::vector<std::string>(uint64_t)> getExtraValues) {
+  std::stringstream ss;
+  ss << R"([)";
+  for (uint64_t i = 0; i < recordCount; ++i) {
+    if (i != 0) ss << ",";
+    ss << GenerateRecord(i, timestamp, false, getExtraValues);
+  }
+  ss << R"(])";
+  return ss.str();
+}
+
+std::string GenerateExpectedRecords(
+    uint64_t recordCount, const std::string& timestamp,
+    std::function<std::vector<std::string>(uint64_t)> getLeftExtraValues,
+    std::function<std::vector<std::string>(uint64_t)> getRightExtraValues) {
+  std::stringstream ss;
+  ss << R"([)";
+  for (uint64_t i = 0; i < recordCount; ++i) {
+    if (i >= NUM_MATCH_RECORDS) break;  // Consider only matched records
+
+    if (i != 0) ss << ",";
+
+    auto leftExtraValues = getLeftExtraValues(i);
+    auto rightExtraValues = getRightExtraValues(i);
+
+    // Construct a matched record (inner join result)
+    ss << R"([")" << timestamp << R"(",)"
+       << "\"" << i + 1 << "\"";
+    for (const auto& value : leftExtraValues) {
+      ss << R"(,")" << value << R"(")";
+    }
+    ss << R"(,")" << timestamp << R"(",)"
+       << "\"" << i + 1 << "\"";
+    for (const auto& value : rightExtraValues) {
+      ss << R"(,")" << value << R"(")";
+    }
+    ss << R"(])";
+  }
+  ss << R"(])";
+  return ss.str();
+}
+
+// This test case is related to GH-38147
+// In the previous code, only 1030 matching records are returned.
+TEST(HashJoin, InnerJoinTestLargerThanMiniBatchSize) {
+  // Just as an example, returning "foo" for every record
+  auto leftExtraValues = [](uint64_t index) -> std::vector<std::string> {
+    return {"foo"};
+  };
+
+  // Just as an example, returning "foo" and "bar" for every record
+  auto rightExtraValues = [](uint64_t index) -> std::vector<std::string> {
+    return {"foo", "bar"};
+  };
+
+  std::string currTS = GenerateTimestamp();
+
+  std::string leftRecords =
+      GenerateLeftRecords(NUM_LEFT_RECORDS, currTS, leftExtraValues);
+  std::string rightRecords =
+      GenerateRightRecords(NUM_RIGHT_RECORDS, currTS, rightExtraValues);
+  std::string expectedRecords = GenerateExpectedRecords(
+      NUM_MATCH_RECORDS, currTS, leftExtraValues, rightExtraValues);
+
+  // Initialize left input data with the specific timestamp and large utf8 data
+  BatchesWithSchema input_left;
+  input_left.batches = {ExecBatchFromJSON(
+      {timestamp(TimeUnit::MICRO, "UTC"), large_utf8(), large_utf8()}, 
leftRecords)};

Review Comment:
   Similar question here. Is using timestamps necessary to surface the bug or 
would it also be reproduced with e.g. plain integers? Trying to reduce the test 
case into a minimal reproducer would be useful for understanding and 
maintenance.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to