westonpace commented on a change in pull request #9095:
URL: https://github.com/apache/arrow/pull/9095#discussion_r565963804
##########
File path: cpp/src/arrow/util/iterator.h
##########
@@ -428,10 +547,16 @@ struct ReadaheadIteratorPromise : ReadaheadPromise {
explicit ReadaheadIteratorPromise(Iterator<T>* it) : it_(it) {}
- void Call() override {
+ bool Call() override {
assert(!called_);
out_ = it_->Next();
called_ = true;
+ return out_ == IterationTraits<T>::End();
+ }
+
+ void End() override {
+ // No need to do anything for the synchronous case. No one is waiting on
this
+ // called_ = true;
Review comment:
Holdover from debugging. I have uncommented this line.
##########
File path: cpp/src/arrow/util/iterator_test.cc
##########
@@ -129,11 +130,44 @@ template <typename T>
inline Iterator<T> EmptyIt() {
return MakeEmptyIterator<T>();
}
-
inline Iterator<TestInt> VectorIt(std::vector<TestInt> v) {
return MakeVectorIterator<TestInt>(std::move(v));
}
+std::function<Future<TestInt>()> AsyncVectorIt(std::vector<TestInt> v) {
Review comment:
Done.
##########
File path: cpp/src/arrow/util/iterator_test.cc
##########
@@ -214,6 +255,270 @@ TEST(TestVectorIterator, RangeForLoop) {
ASSERT_EQ(ints_it, ints.end());
}
+template <typename T>
+Transformer<T, T> MakeFirstN(int n) {
+ int remaining = n;
+ return [remaining](T next) mutable -> Result<TransformFlow<T>> {
+ if (remaining > 0) {
+ remaining--;
+ return TransformYield(next);
+ }
+ return TransformFinish();
+ };
+}
+
+TEST(TestIteratorTransform, Truncating) {
+ auto original = VectorIt({1, 2, 3});
+ auto truncated = MakeTransformedIterator(std::move(original),
MakeFirstN<TestInt>(2));
+ AssertIteratorMatch({1, 2}, std::move(truncated));
+}
+
+TEST(TestIteratorTransform, TestPointer) {
+ auto original = VectorIt<std::shared_ptr<int>>(
+ {std::make_shared<int>(1), std::make_shared<int>(2),
std::make_shared<int>(3)});
+ auto truncated =
+ MakeTransformedIterator(std::move(original),
MakeFirstN<std::shared_ptr<int>>(2));
+ ASSERT_OK_AND_ASSIGN(auto result, truncated.ToVector());
+ ASSERT_EQ(2, result.size());
+}
+
+TEST(TestIteratorTransform, TruncatingShort) {
+ // Tests the failsafe case where we never call Finish
+ auto original = VectorIt({1});
+ auto truncated = MakeTransformedIterator<TestInt,
TestInt>(std::move(original),
+
MakeFirstN<TestInt>(2));
+ AssertIteratorMatch({1}, std::move(truncated));
+}
+
+TEST(TestAsyncUtil, Background) {
+ std::vector<TestInt> expected = {1, 2, 3};
+ auto background = BackgroundAsyncVectorIt(expected);
+ auto future = CollectAsyncGenerator(background);
+ ASSERT_FALSE(future.is_finished());
+ future.Wait();
+ ASSERT_TRUE(future.is_finished());
+ ASSERT_EQ(expected, *future.result());
+}
+
+struct SlowEmptyIterator {
+ Result<TestInt> Next() {
+ if (called_) {
+ return Status::Invalid("Should not have been called twice");
+ }
+ SleepFor(0.1);
+ return IterationTraits<TestInt>::End();
+ }
+
+ private:
+ bool called_ = false;
+};
+
+TEST(TestAsyncUtil, BackgroundRepeatEnd) {
+ // Ensure that the background iterator properly fulfills the asyncgenerator
contract
+ // and can be called after it ends.
+ auto iterator = Iterator<TestInt>(SlowEmptyIterator());
+ ASSERT_OK_AND_ASSIGN(
+ auto background_iter,
+ MakeBackgroundIterator(std::move(iterator),
internal::GetCpuThreadPool()));
+
+ auto one = background_iter();
+ auto two = background_iter();
+
+ ASSERT_TRUE(one.Wait(0.5));
+
+ if (one.is_finished()) {
Review comment:
Changed to ASSERT_FINISHES...
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]