bkietz commented on a change in pull request #9533: URL: https://github.com/apache/arrow/pull/9533#discussion_r581379184
########## File path: cpp/src/arrow/util/async_generator.h ########## @@ -177,6 +179,90 @@ class TransformingGenerator { std::shared_ptr<TransformingGeneratorState> state_; }; +template <typename T> +class SerialReadaheadGenerator { + public: + SerialReadaheadGenerator(AsyncGenerator<T> source_generator, int max_readahead) + : state_(std::make_shared<State>(std::move(source_generator), max_readahead)) {} + + Future<T> operator()() { + if (state_->first) { + // Lazy generator, need to wait for the first ask to prime the pump + state_->first = false; + auto next = state_->source(); + return next.Then(Callback{state_}); + } else { + // This generator is not async-reentrant. We won't be called until the last + // future finished so we know there is something in the queue + auto finished = state_->finished.load(); + if (finished && state_->readahead_queue.isEmpty()) { + return Future<T>::MakeFinished(IterationTraits<T>::End()); + } + auto next_ptr = state_->readahead_queue.frontPtr(); + DCHECK(next_ptr != NULLPTR); Review comment: DCHECK may not be used in non-internal headers https://github.com/westonpace/arrow/blob/c28e3933d844ab9ddbf708c243a6f0f18745bf73/cpp/src/arrow/public_api_test.cc#L25-L27 ########## File path: cpp/src/arrow/util/async_generator.h ########## @@ -177,6 +179,90 @@ class TransformingGenerator { std::shared_ptr<TransformingGeneratorState> state_; }; +template <typename T> +class SerialReadaheadGenerator { + public: + SerialReadaheadGenerator(AsyncGenerator<T> source_generator, int max_readahead) + : state_(std::make_shared<State>(std::move(source_generator), max_readahead)) {} + + Future<T> operator()() { + if (state_->first) { + // Lazy generator, need to wait for the first ask to prime the pump + state_->first = false; + auto next = state_->source(); + return next.Then(Callback{state_}); + } else { Review comment: Please space this out to improve readability. Additionally, per the style guide: please avoid redundant `else` blocks ########## File path: cpp/src/arrow/vendored/ProducerConsumerQueue.h ########## @@ -0,0 +1,214 @@ +// Vendored from git tag v2021.02.15.00 + +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// @author Bo Hu (b...@fb.com) +// @author Jordan DeLong (delon...@fb.com) + +#pragma once + +#include <atomic> +#include <cassert> +#include <cstdlib> +#include <memory> +#include <stdexcept> +#include <type_traits> +#include <utility> + +namespace arrow_vendored { +namespace folly { + +// Vendored from folly/Portability.h +namespace { +#if defined(__arm__) +#define FOLLY_ARM 1 +#else +#define FOLLY_ARM 0 +#endif + +#if defined(__s390x__) +#define FOLLY_S390X 1 +#else +#define FOLLY_S390X 0 +#endif + +constexpr bool kIsArchArm = FOLLY_ARM == 1; +constexpr bool kIsArchS390X = FOLLY_S390X == 1; +} // namespace + +// Vendored from folly/lang/Align.h +namespace { + +constexpr std::size_t hardware_destructive_interference_size = + (kIsArchArm || kIsArchS390X) ? 64 : 128; + +} // namespace + +/* + * ProducerConsumerQueue is a one producer and one consumer queue + * without locks. + */ +template <class T> +struct ProducerConsumerQueue { + typedef T value_type; + + ProducerConsumerQueue(const ProducerConsumerQueue&) = delete; + ProducerConsumerQueue& operator=(const ProducerConsumerQueue&) = delete; + + // size must be >= 2. + // + // Also, note that the number of usable slots in the queue at any + // given time is actually (size-1), so if you start with an empty queue, + // isFull() will return true after size-1 insertions. + explicit ProducerConsumerQueue(uint32_t size) + : size_(size), + records_(static_cast<T*>(std::malloc(sizeof(T) * size))), + readIndex_(0), + writeIndex_(0) { + assert(size >= 2); + if (!records_) { + throw std::bad_alloc(); + } + } + + ~ProducerConsumerQueue() { + // We need to destruct anything that may still exist in our queue. + // (No real synchronization needed at destructor time: only one + // thread can be doing this.) + if (!std::is_trivially_destructible<T>::value) { + size_t readIndex = readIndex_; + size_t endIndex = writeIndex_; + while (readIndex != endIndex) { + records_[readIndex].~T(); + if (++readIndex == size_) { + readIndex = 0; + } + } + } + + std::free(records_); + } + + template <class... Args> + bool write(Args&&... recordArgs) { + auto const currentWrite = writeIndex_.load(std::memory_order_relaxed); + auto nextRecord = currentWrite + 1; + if (nextRecord == size_) { + nextRecord = 0; + } + if (nextRecord != readIndex_.load(std::memory_order_acquire)) { + new (&records_[currentWrite]) T(std::forward<Args>(recordArgs)...); + writeIndex_.store(nextRecord, std::memory_order_release); + return true; + } + + // queue is full + return false; + } + + // move (or copy) the value at the front of the queue to given variable + bool read(T& record) { + auto const currentRead = readIndex_.load(std::memory_order_relaxed); + if (currentRead == writeIndex_.load(std::memory_order_acquire)) { + // queue is empty + return false; + } + + auto nextRecord = currentRead + 1; + if (nextRecord == size_) { + nextRecord = 0; + } + record = std::move(records_[currentRead]); + records_[currentRead].~T(); + readIndex_.store(nextRecord, std::memory_order_release); + return true; + } + + // pointer to the value at the front of the queue (for use in-place) or + // nullptr if empty. + T* frontPtr() { Review comment: The difference in naming conventions here is frustrating. Is it worthwhile to rename these to `FrontPtr` etc? ########## File path: cpp/src/arrow/util/iterator_test.cc ########## @@ -726,6 +730,142 @@ TEST(TestAsyncUtil, CompleteBackgroundStressTest) { } } +template <typename T> +class SlowSourcePreventingReentrant { + public: + explicit SlowSourcePreventingReentrant(AsyncGenerator<T> source) + : state_(std::make_shared<State>(std::move(source))) {} + + Future<TestInt> operator()() { + if (state_->in.load()) { + state_->valid.store(false); + } + state_->in.store(true); + auto result = state_->source(); + result.AddCallback(Callback{state_}); + return result; + } + + void AssertValid() { + EXPECT_EQ(true, state_->valid.load()) + << "The generator was accessed in a reentrant manner"; + } + + private: + struct State { + explicit State(AsyncGenerator<T> source_) + : source(std::move(source_)), in(false), valid(true) {} + + AsyncGenerator<T> source; + std::atomic<bool> in; + std::atomic<bool> valid; + }; + struct Callback { + void operator()(const Result<T>& result) { state_->in.store(false); } + std::shared_ptr<State> state_; + }; + + std::shared_ptr<State> state_; +}; + +TEST(TestAsyncUtil, SerialReadaheadSlowProducer) { + AsyncGenerator<TestInt> it = BackgroundAsyncVectorIt({1, 2, 3, 4, 5}); + SlowSourcePreventingReentrant<TestInt> tracker(std::move(it)); + SerialReadaheadGenerator<TestInt> serial_readahead(tracker, 2); + AssertAsyncGeneratorMatch({1, 2, 3, 4, 5}, + static_cast<AsyncGenerator<TestInt>>(serial_readahead)); + tracker.AssertValid(); +} + +TEST(TestAsyncUtil, SerialReadaheadSlowConsumer) { + int num_delivered = 0; + auto source = [&num_delivered]() { + if (num_delivered < 5) { + return Future<TestInt>::MakeFinished(num_delivered++); + } else { + return Future<TestInt>::MakeFinished(IterationTraits<TestInt>::End()); + } + }; + SerialReadaheadGenerator<TestInt> serial_readahead(std::move(source), 3); + SleepABit(); + ASSERT_EQ(0, num_delivered); + ASSERT_FINISHES_OK_AND_ASSIGN(auto next, serial_readahead()); + ASSERT_EQ(0, next.value); + ASSERT_EQ(3, num_delivered); + AssertAsyncGeneratorMatch({1, 2, 3, 4}, + static_cast<AsyncGenerator<TestInt>>(serial_readahead)); +} + +TEST(TestAsyncUtil, SerialReadaheadStress) { + constexpr int NTASKS = 20; + constexpr int NITEMS = 50; + constexpr int EXPECTED_SUM = (NITEMS * (NITEMS - 1)) / 2; + for (int i = 0; i < NTASKS; i++) { + AsyncGenerator<TestInt> it = BackgroundAsyncVectorIt(RangeVector(NITEMS)); + SerialReadaheadGenerator<TestInt> serial_readahead(it, 2); + unsigned int sum = 0; + auto visit_fut = VisitAsyncGenerator<TestInt>( + serial_readahead, [&sum](TestInt test_int) -> Status { + sum += test_int.value; + // Normally sleeping in a visit function would be a faux-pas but we want to slow + // the reader down to match the producer to maximize the stress + std::this_thread::sleep_for(kYieldDuration); + return Status::OK(); + }); + ASSERT_FINISHES_OK(visit_fut); + ASSERT_EQ(EXPECTED_SUM, sum); + } +} + +TEST(TestAsyncUtil, SerialReadaheadStressFast) { + constexpr int NTASKS = 20; + constexpr int NITEMS = 50; + constexpr int EXPECTED_SUM = (NITEMS * (NITEMS - 1)) / 2; + for (int i = 0; i < NTASKS; i++) { + AsyncGenerator<TestInt> it = BackgroundAsyncVectorIt(RangeVector(NITEMS), false); + SerialReadaheadGenerator<TestInt> serial_readahead(it, 2); + unsigned int sum = 0; + auto visit_fut = VisitAsyncGenerator<TestInt>(serial_readahead, + [&sum](TestInt test_int) -> Status { + sum += test_int.value; + return Status::OK(); + }); + ASSERT_FINISHES_OK(visit_fut); + ASSERT_EQ(EXPECTED_SUM, sum); Review comment: IIUC, the intent of this test is to assert that concurrent writes haven't clobbered each other in this sum, thus demonstrating that the visitor was never called re-entrantly. If that's the case, please instead assert that a mutex isn't locked or similar ########## File path: cpp/src/arrow/util/async_generator.h ########## @@ -177,6 +179,90 @@ class TransformingGenerator { std::shared_ptr<TransformingGeneratorState> state_; }; +template <typename T> +class SerialReadaheadGenerator { + public: + SerialReadaheadGenerator(AsyncGenerator<T> source_generator, int max_readahead) + : state_(std::make_shared<State>(std::move(source_generator), max_readahead)) {} + + Future<T> operator()() { + if (state_->first) { + // Lazy generator, need to wait for the first ask to prime the pump + state_->first = false; + auto next = state_->source(); + return next.Then(Callback{state_}); + } else { + // This generator is not async-reentrant. We won't be called until the last + // future finished so we know there is something in the queue + auto finished = state_->finished.load(); + if (finished && state_->readahead_queue.isEmpty()) { + return Future<T>::MakeFinished(IterationTraits<T>::End()); + } + auto next_ptr = state_->readahead_queue.frontPtr(); + DCHECK(next_ptr != NULLPTR); + auto next = std::move(**next_ptr); + state_->readahead_queue.popFront(); + auto last_available = state_->spaces_available.fetch_add(1); + if (last_available == 0 && !finished) { + // Reader idled out, we need to restart it + state_->Pump(state_); + } + return next; + } + } + + private: + struct State { + State(AsyncGenerator<T> source_, int max_readahead) + : first(true), + source(std::move(source_)), + finished(false), + spaces_available(max_readahead), + readahead_queue(max_readahead) {} + + void Pump(std::shared_ptr<State>& self) { + // Can't do readahead_queue.write(source().Then(Callback{self})) because then the + // callback might run immediately and add itself to the queue before this gets added + // to the queue messing up the order + auto next_slot = std::make_shared<Future<T>>(); + auto written = readahead_queue.write(next_slot); + DCHECK(written); + *next_slot = source().Then(Callback{self}); + } + + // Only accessed by the consumer end + bool first; + // Accessed by both threads + AsyncGenerator<T> source; + std::atomic<bool> finished; + std::atomic<uint32_t> spaces_available; + util::SpscQueue<std::shared_ptr<Future<T>>> readahead_queue; + }; + + struct Callback { + Result<T> operator()(const Result<T>& next_result) { Review comment: Nit: ```suggestion Result<T> operator()(const Result<T>& maybe_next) { ``` ########## File path: cpp/src/arrow/util/queue_benchmark.cc ########## @@ -0,0 +1,87 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <algorithm> +#include <iterator> +#include <thread> +#include <vector> + +#include <benchmark/benchmark.h> + +#include "arrow/buffer.h" +#include "arrow/util/queue.h" + +namespace arrow { + +namespace util { + +static constexpr int64_t kSize = 100000; + +void throughput(benchmark::State& state) { Review comment: ```suggestion void SpscQueueThroughput(benchmark::State& state) { ``` ########## File path: cpp/src/arrow/util/async_generator.h ########## @@ -177,6 +179,90 @@ class TransformingGenerator { std::shared_ptr<TransformingGeneratorState> state_; }; +template <typename T> +class SerialReadaheadGenerator { + public: + SerialReadaheadGenerator(AsyncGenerator<T> source_generator, int max_readahead) + : state_(std::make_shared<State>(std::move(source_generator), max_readahead)) {} + + Future<T> operator()() { + if (state_->first) { + // Lazy generator, need to wait for the first ask to prime the pump + state_->first = false; + auto next = state_->source(); + return next.Then(Callback{state_}); + } else { + // This generator is not async-reentrant. We won't be called until the last + // future finished so we know there is something in the queue + auto finished = state_->finished.load(); + if (finished && state_->readahead_queue.isEmpty()) { + return Future<T>::MakeFinished(IterationTraits<T>::End()); + } + auto next_ptr = state_->readahead_queue.frontPtr(); + DCHECK(next_ptr != NULLPTR); + auto next = std::move(**next_ptr); + state_->readahead_queue.popFront(); + auto last_available = state_->spaces_available.fetch_add(1); + if (last_available == 0 && !finished) { + // Reader idled out, we need to restart it + state_->Pump(state_); + } + return next; + } + } + + private: + struct State { + State(AsyncGenerator<T> source_, int max_readahead) + : first(true), + source(std::move(source_)), + finished(false), + spaces_available(max_readahead), + readahead_queue(max_readahead) {} + + void Pump(std::shared_ptr<State>& self) { Review comment: ```suggestion void Pump(const std::shared_ptr<State>& self) { ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org