icexelloss commented on code in PR #13771:
URL: https://github.com/apache/arrow/pull/13771#discussion_r938903559
##########
cpp/src/arrow/compute/exec/asof_join_node.cc:
##########
@@ -249,12 +263,23 @@ class InputState {
return updated;
}
- void Push(const std::shared_ptr<arrow::RecordBatch>& rb) {
+ Status Push(const std::shared_ptr<arrow::RecordBatch>& rb) {
if (rb->num_rows() > 0) {
+ int64_t batch_earliest_time =
+ rb->column_data(time_col_index_)->GetValues<int64_t>(1)[0];
+ int64_t batch_latest_time =
+
rb->column_data(time_col_index_)->GetValues<int64_t>(1)[rb->num_rows() - 1];
+ // Batches must be in order
+ if (batch_earliest_time < latest_time_) {
+ return Status::Invalid("Batches out of order.");
Review Comment:
Can you include `batch_earliest_time` and `latest_time_` in the error meesage
##########
cpp/src/arrow/compute/exec/asof_join_node_test.cc:
##########
@@ -306,5 +332,10 @@ TEST(AsofJoinTest, TestMissingKeys) {
{field("time", int64()), field("key1", int32()), field("r0_v0",
float64())}));
}
+TEST(AsofJoinTest, SourceInOrderAssertion) {
Review Comment:
Can you change `SourceInOrderAssertion` to `TestSourceOutOfOrder`
##########
cpp/src/arrow/compute/exec/asof_join_node.cc:
##########
@@ -46,52 +46,66 @@ typedef uint64_t row_index_t;
typedef int col_index_t;
/**
- * Simple implementation for an unbound concurrent queue
+ * Simple implementation for a bounded concurrent queue
*/
template <class T>
-class ConcurrentQueue {
+class ConcurrentBoundedQueue {
+ size_t remaining_;
+ std::vector<T> buffer_;
Review Comment:
Can you move these variable to the `private` member section at the bottom
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]