lidavidm commented on a change in pull request #10664:
URL: https://github.com/apache/arrow/pull/10664#discussion_r667117812



##########
File path: cpp/src/arrow/testing/matchers.h
##########
@@ -138,40 +179,62 @@ class StatusMatcher {
   }
 
  private:
-  static const Status& GetStatus(const Status& status) { return status; }
+  const StatusCode code_;
+  const util::optional<testing::Matcher<std::string>> message_matcher_;
+};
 
-  template <typename T>
-  static const Status& GetStatus(const Result<T>& maybe_value) {
-    return maybe_value.status();
-  }
+class OkMatcher {
+ public:
+  template <typename Res>
+  operator testing::Matcher<Res>() const {  // NOLINT runtime/explicit
+    struct Impl : testing::MatcherInterface<const Res&> {
+      void DescribeTo(::std::ostream* os) const override { *os << "is ok"; }
 
-  template <typename T>
-  static const Status& GetStatus(const Future<T>& value_fut) {
-    return value_fut.status();
-  }
+      void DescribeNegationTo(::std::ostream* os) const override { *os << "is 
not ok"; }
 
-  const StatusCode code_;
-  const util::optional<testing::Matcher<std::string>> message_matcher_;
+      bool MatchAndExplain(const Res& maybe_value,
+                           testing::MatchResultListener* listener) const 
override {
+        const Status& status = internal::GenericToStatus(maybe_value);
+        testing::StringMatchResultListener value_listener;
+
+        const bool match = status.ok();
+        *listener << "whose value " << 
testing::PrintToString(status.ToString())
+                  << (match ? " matches" : " doesn't match");
+        testing::internal::PrintIfNotEmpty(value_listener.str(), 
listener->stream());
+        return match;
+      }
+    };
+
+    return testing::Matcher<Res>(new Impl());
+  }
 };
 
-// Returns a matcher that matches the value of a successful Result<T> or 
Future<T>.
-// (Future<T> will be waited upon to acquire its result for matching.)
+// Returns a matcher that waits on a Future (by default for 16 seconds)

Review comment:
       nit: kDefaultAssertFinishesWaitSeconds seconds?

##########
File path: cpp/src/arrow/compute/exec/exec_plan.cc
##########
@@ -39,11 +39,13 @@ namespace compute {
 namespace {
 
 struct ExecPlanImpl : public ExecPlan {
-  ExecPlanImpl() = default;
+  explicit ExecPlanImpl(ExecContext* exec_context) : ExecPlan(exec_context) {}
 
   ~ExecPlanImpl() override {
-    if (started_ && !stopped_) {
+    if (started_ && !finished_.is_finished()) {

Review comment:
       This is slightly race-prone/this is a TOCTOU, right? Though the 
consequences aren't big here.

##########
File path: cpp/src/arrow/compute/exec/exec_plan.cc
##########
@@ -282,20 +305,21 @@ struct SourceNode : ExecNode {
 
   void StopProducing(ExecNode* output) override {
     DCHECK_EQ(output, outputs_[0]);
-    {
-      std::unique_lock<std::mutex> lock(mutex_);
-      finished_ = true;
-    }
-    finished_fut_.Wait();
+    StopProducing();
   }
 
-  void StopProducing() override { StopProducing(outputs_[0]); }
+  void StopProducing() override {
+    std::unique_lock<std::mutex> lock(mutex_);
+    stop_requested_ = true;
+  }
+
+  Future<> finished() override { return finished_; }
 
  private:
   std::mutex mutex_;
-  bool finished_{false};
-  int next_batch_index_{0};
-  Future<> finished_fut_ = Future<>::MakeFinished();
+  bool stop_requested_{false};

Review comment:
       Filed ARROW-13297.

##########
File path: cpp/src/arrow/compute/exec/exec_plan.cc
##########
@@ -220,58 +240,61 @@ struct SourceNode : ExecNode {
 
   const char* kind_name() override { return "SourceNode"; }
 
-  static void NoInputs() { DCHECK(false) << "no inputs; this should never be 
called"; }
-  void InputReceived(ExecNode*, int, ExecBatch) override { NoInputs(); }
-  void ErrorReceived(ExecNode*, Status) override { NoInputs(); }
-  void InputFinished(ExecNode*, int) override { NoInputs(); }
+  [[noreturn]] static void NoInputs() {
+    DCHECK(false) << "no inputs; this should never be called";
+    std::abort();
+  }
+  [[noreturn]] void InputReceived(ExecNode*, int, ExecBatch) override { 
NoInputs(); }
+  [[noreturn]] void ErrorReceived(ExecNode*, Status) override { NoInputs(); }
+  [[noreturn]] void InputFinished(ExecNode*, int) override { NoInputs(); }
 
   Status StartProducing() override {
-    if (finished_) {
-      return Status::Invalid("Restarted SourceNode '", label(), "'");
+    DCHECK(!stop_requested_) << "Restarted SourceNode";

Review comment:
       Should we not still return a Status? It might save us from odd bug 
reports if we error instead of letting internal state get trampled in case of a 
bug.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to