westonpace commented on a change in pull request #9095:
URL: https://github.com/apache/arrow/pull/9095#discussion_r565963530
##########
File path: cpp/src/arrow/util/iterator.h
##########
@@ -186,6 +187,122 @@ class Iterator : public
util::EqualityComparable<Iterator<T>> {
Result<T> (*next_)(void*) = NULLPTR;
};
+template <typename T>
+struct TransformFlow {
+ using YieldValueType = T;
+
+ TransformFlow(YieldValueType value, bool ready_for_next)
+ : finished_(false),
+ ready_for_next_(ready_for_next),
+ yield_value_(std::move(value)) {}
+ TransformFlow(bool finished, bool ready_for_next)
+ : finished_(finished), ready_for_next_(ready_for_next), yield_value_() {}
+
+ bool HasValue() const { return yield_value_.has_value(); }
+ bool Finished() const { return finished_; }
+ bool ReadyForNext() const { return ready_for_next_; }
+ T Value() const { return *yield_value_; }
+
+ bool finished_ = false;
+ bool ready_for_next_ = false;
+ util::optional<YieldValueType> yield_value_;
+};
+
+struct TransformFinish {
+ template <typename T>
+ operator TransformFlow<T>() && { // NOLINT explicit
+ return TransformFlow<T>(true, true);
+ }
+};
+
+struct TransformSkip {
+ template <typename T>
+ operator TransformFlow<T>() && { // NOLINT explicit
+ return TransformFlow<T>(false, true);
+ }
+};
+
+template <typename T>
+TransformFlow<T> TransformYield(T value = {}, bool ready_for_next = true) {
+ return TransformFlow<T>(std::move(value), ready_for_next);
+}
+
+template <typename T, typename V>
+using Transformer = std::function<Result<TransformFlow<V>>(T)>;
+
+template <typename T, typename V>
+class TransformIterator {
+ public:
+ explicit TransformIterator(Iterator<T> it, Transformer<T, V> transformer)
+ : it_(std::move(it)),
+ transformer_(std::move(transformer)),
+ last_value_(),
+ finished_() {}
+
+ // Calls the transform function on the current value. Can return in several
ways
+ // * If the next value is requested (e.g. skip) it will return an empty
optional
+ // * If an invalid status is encountered that will be returned
+ // * If finished it will return IterationTraits<V>::End()
+ // * If a value is returned by the transformer that will be returned
+ Result<util::optional<V>> Pump() {
+ if (!finished_ && last_value_.has_value()) {
+ ARROW_ASSIGN_OR_RAISE(TransformFlow<V> next, transformer_(*last_value_));
+ if (next.ReadyForNext()) {
+ if (*last_value_ == IterationTraits<T>::End()) {
+ finished_ = true;
+ }
+ last_value_.reset();
+ }
+ if (next.Finished()) {
+ finished_ = true;
+ }
+ if (next.HasValue()) {
+ return next.Value();
+ }
+ }
+ if (finished_) {
+ return IterationTraits<V>::End();
+ }
+ return util::nullopt;
+ }
+
+ Result<V> Next() {
+ while (!finished_) {
+ ARROW_ASSIGN_OR_RAISE(util::optional<V> next, Pump());
+ if (next.has_value()) {
+ return *next;
+ }
+ ARROW_ASSIGN_OR_RAISE(last_value_, it_.Next());
+ }
+ return IterationTraits<V>::End();
+ }
+
+ private:
+ Iterator<T> it_;
+ Transformer<T, V> transformer_;
+ util::optional<T> last_value_;
+ bool finished_ = false;
+};
+
+/// \brief Transforms an iterator according to a transformer, returning a new
Iterator.
+///
+/// The transformer will be called on each element of the source iterator and
for each
+/// call it can yield a value, skip, or finish the iteration. When yielding a
value the
+/// transformer can choose to consume the source item (the default,
ready_for_next = true)
+/// or to keep it and it will be called again on the same value.
Review comment:
It would be possible to create an async generator that operated in this
way. However, I'm not able to come up with a "transform" style factory that
wraps a single function and allows that function to be similarly transformed by
synchronous iterators.
Since the only iterators I truly need to be "transform functions" are the
CSV block reader and chunker (to avoid duplication) I could drop the
ready_for_next (this also prevents the resulting generator from being async
re-entrant) and decompressor style generators would have to use some other
mechanism for creation (and I just wouldn't create a synchronous counterpart).
##########
File path: cpp/src/arrow/util/iterator.h
##########
@@ -186,6 +187,122 @@ class Iterator : public
util::EqualityComparable<Iterator<T>> {
Result<T> (*next_)(void*) = NULLPTR;
};
+template <typename T>
+struct TransformFlow {
+ using YieldValueType = T;
+
+ TransformFlow(YieldValueType value, bool ready_for_next)
+ : finished_(false),
+ ready_for_next_(ready_for_next),
+ yield_value_(std::move(value)) {}
+ TransformFlow(bool finished, bool ready_for_next)
+ : finished_(finished), ready_for_next_(ready_for_next), yield_value_() {}
+
+ bool HasValue() const { return yield_value_.has_value(); }
+ bool Finished() const { return finished_; }
+ bool ReadyForNext() const { return ready_for_next_; }
+ T Value() const { return *yield_value_; }
+
+ bool finished_ = false;
+ bool ready_for_next_ = false;
+ util::optional<YieldValueType> yield_value_;
+};
+
+struct TransformFinish {
+ template <typename T>
+ operator TransformFlow<T>() && { // NOLINT explicit
+ return TransformFlow<T>(true, true);
+ }
+};
+
+struct TransformSkip {
+ template <typename T>
+ operator TransformFlow<T>() && { // NOLINT explicit
+ return TransformFlow<T>(false, true);
+ }
+};
+
+template <typename T>
+TransformFlow<T> TransformYield(T value = {}, bool ready_for_next = true) {
+ return TransformFlow<T>(std::move(value), ready_for_next);
+}
+
+template <typename T, typename V>
+using Transformer = std::function<Result<TransformFlow<V>>(T)>;
+
+template <typename T, typename V>
+class TransformIterator {
+ public:
+ explicit TransformIterator(Iterator<T> it, Transformer<T, V> transformer)
+ : it_(std::move(it)),
+ transformer_(std::move(transformer)),
+ last_value_(),
+ finished_() {}
+
+ // Calls the transform function on the current value. Can return in several
ways
+ // * If the next value is requested (e.g. skip) it will return an empty
optional
+ // * If an invalid status is encountered that will be returned
+ // * If finished it will return IterationTraits<V>::End()
+ // * If a value is returned by the transformer that will be returned
+ Result<util::optional<V>> Pump() {
Review comment:
Not anymore.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]