westonpace commented on a change in pull request #9095: URL: https://github.com/apache/arrow/pull/9095#discussion_r564834775
########## File path: cpp/src/arrow/util/iterator.h ########## @@ -186,6 +187,122 @@ class Iterator : public util::EqualityComparable<Iterator<T>> { Result<T> (*next_)(void*) = NULLPTR; }; +template <typename T> +struct TransformFlow { + using YieldValueType = T; + + TransformFlow(YieldValueType value, bool ready_for_next) + : finished_(false), + ready_for_next_(ready_for_next), + yield_value_(std::move(value)) {} + TransformFlow(bool finished, bool ready_for_next) + : finished_(finished), ready_for_next_(ready_for_next), yield_value_() {} + + bool HasValue() const { return yield_value_.has_value(); } + bool Finished() const { return finished_; } + bool ReadyForNext() const { return ready_for_next_; } + T Value() const { return *yield_value_; } + + bool finished_ = false; + bool ready_for_next_ = false; + util::optional<YieldValueType> yield_value_; +}; + +struct TransformFinish { + template <typename T> + operator TransformFlow<T>() && { // NOLINT explicit + return TransformFlow<T>(true, true); + } +}; + +struct TransformSkip { + template <typename T> + operator TransformFlow<T>() && { // NOLINT explicit + return TransformFlow<T>(false, true); + } +}; + +template <typename T> +TransformFlow<T> TransformYield(T value = {}, bool ready_for_next = true) { + return TransformFlow<T>(std::move(value), ready_for_next); +} + +template <typename T, typename V> +using Transformer = std::function<Result<TransformFlow<V>>(T)>; + +template <typename T, typename V> +class TransformIterator { + public: + explicit TransformIterator(Iterator<T> it, Transformer<T, V> transformer) + : it_(std::move(it)), + transformer_(std::move(transformer)), + last_value_(), + finished_() {} + + // Calls the transform function on the current value. Can return in several ways + // * If the next value is requested (e.g. skip) it will return an empty optional + // * If an invalid status is encountered that will be returned + // * If finished it will return IterationTraits<V>::End() + // * If a value is returned by the transformer that will be returned + Result<util::optional<V>> Pump() { + if (!finished_ && last_value_.has_value()) { + ARROW_ASSIGN_OR_RAISE(TransformFlow<V> next, transformer_(*last_value_)); + if (next.ReadyForNext()) { + if (*last_value_ == IterationTraits<T>::End()) { + finished_ = true; + } + last_value_.reset(); + } + if (next.Finished()) { + finished_ = true; + } + if (next.HasValue()) { + return next.Value(); + } + } + if (finished_) { + return IterationTraits<V>::End(); + } + return util::nullopt; + } + + Result<V> Next() { + while (!finished_) { + ARROW_ASSIGN_OR_RAISE(util::optional<V> next, Pump()); + if (next.has_value()) { + return *next; + } + ARROW_ASSIGN_OR_RAISE(last_value_, it_.Next()); + } + return IterationTraits<V>::End(); + } + + private: + Iterator<T> it_; + Transformer<T, V> transformer_; + util::optional<T> last_value_; + bool finished_ = false; +}; + +/// \brief Transforms an iterator according to a transformer, returning a new Iterator. +/// +/// The transformer will be called on each element of the source iterator and for each +/// call it can yield a value, skip, or finish the iteration. When yielding a value the +/// transformer can choose to consume the source item (the default, ready_for_next = true) +/// or to keep it and it will be called again on the same value. Review comment: A decompression iterator. If the iterator consumes 1MB blocks and outputs 1MB blocks it will likely need to output N blocks for each input block. Or a flatMap operator which takes in lists and outputs elements. For example, in a dataset situation you could conceivably encounter `AsyncIterator<Dataset> -> AsyncIterator<Directory> -> AsyncIterator<File> -> AsyncIterator<Segment>` Each element in the chain would need to output multiple elements for each input. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org