lidavidm commented on a change in pull request #9607:
URL: https://github.com/apache/arrow/pull/9607#discussion_r604965806
##########
File path: cpp/src/arrow/dataset/dataset.h
##########
@@ -133,8 +130,20 @@ class ARROW_DS_EXPORT Dataset : public
std::enable_shared_from_this<Dataset> {
Result<std::shared_ptr<ScannerBuilder>> NewScan();
/// \brief GetFragments returns an iterator of Fragments given a predicate.
- Result<FragmentIterator> GetFragments(Expression predicate);
- Result<FragmentIterator> GetFragments();
+ Future<FragmentVector> GetFragmentsAsync(Expression predicate);
+ Result<FragmentIterator> GetFragments(Expression predicate) {
+ auto fut = GetFragmentsAsync(predicate);
+ fut.Wait();
+ ARROW_ASSIGN_OR_RAISE(auto fragments_vec, fut.result());
+ return MakeVectorIterator(fragments_vec);
+ }
+ Future<FragmentVector> GetFragmentsAsync();
+ Result<FragmentIterator> GetFragments() {
+ auto fut = GetFragmentsAsync();
+ fut.Wait();
+ ARROW_ASSIGN_OR_RAISE(auto fragments_vec, fut.result());
Review comment:
nit: Future<T>.result already calls Wait for you
##########
File path: cpp/src/arrow/dataset/dataset.h
##########
@@ -191,9 +200,9 @@ class ARROW_DS_EXPORT InMemoryDataset : public Dataset {
std::shared_ptr<Schema> schema) const override;
protected:
- Result<FragmentIterator> GetFragmentsImpl(Expression predicate) override;
+ Future<FragmentVector> GetFragmentsImpl(Expression predicate) override;
Review comment:
This may be suboptimal for the case of writing datasets.
ARROW-10882/#9802 backs an InMemoryDataset with a RecordBatchReader to support
passing in Python generators; if we have to materialize all fragments up front
that would defeat the purpose.
Though, instead, we could implement a lazy InMemoryFragment instead of the
current approach which materializes an InMemoryFragment for each batch in the
source, so this doesn't block this PR per se (we just need to rework the other
PR a bit once this lands).
##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -166,60 +425,67 @@ Result<std::shared_ptr<Scanner>> ScannerBuilder::Finish()
{
return std::make_shared<Scanner>(dataset_, scan_options_);
}
-static inline RecordBatchVector FlattenRecordBatchVector(
- std::vector<RecordBatchVector> nested_batches) {
- RecordBatchVector flattened;
-
- for (auto& task_batches : nested_batches) {
- for (auto& batch : task_batches) {
- flattened.emplace_back(std::move(batch));
- }
- }
-
- return flattened;
-}
-
struct TableAssemblyState {
/// Protecting mutating accesses to batches
std::mutex mutex{};
- std::vector<RecordBatchVector> batches{};
+ std::vector<std::vector<RecordBatchVector>> batches{};
+ int scan_task_id = 0;
- void Emplace(RecordBatchVector b, size_t position) {
+ void Emplace(std::shared_ptr<RecordBatch> batch, size_t fragment_index,
+ size_t task_index, size_t record_batch_index) {
std::lock_guard<std::mutex> lock(mutex);
- if (batches.size() <= position) {
- batches.resize(position + 1);
+ if (batches.size() <= fragment_index) {
+ batches.resize(fragment_index + 1);
+ }
+ if (batches[fragment_index].size() <= task_index) {
+ batches[fragment_index].resize(task_index + 1);
+ }
+ if (batches[fragment_index][task_index].size() <= record_batch_index) {
+ batches[fragment_index][task_index].resize(record_batch_index + 1);
}
- batches[position] = std::move(b);
+ batches[fragment_index][task_index][record_batch_index] = std::move(batch);
}
};
+struct TaggedRecordBatch {
Review comment:
It seems this is unused?
##########
File path: cpp/src/arrow/dataset/file_csv.cc
##########
@@ -110,35 +110,47 @@ static inline Result<csv::ReadOptions> GetReadOptions(
return read_options;
}
-static inline Result<std::shared_ptr<csv::StreamingReader>> OpenReader(
+static inline Future<std::shared_ptr<csv::StreamingReader>> OpenReaderAsync(
const FileSource& source, const CsvFileFormat& format,
const std::shared_ptr<ScanOptions>& scan_options = nullptr,
MemoryPool* pool = default_memory_pool()) {
ARROW_ASSIGN_OR_RAISE(auto reader_options, GetReadOptions(format,
scan_options));
- util::string_view first_block;
ARROW_ASSIGN_OR_RAISE(auto input, source.OpenCompressed());
ARROW_ASSIGN_OR_RAISE(
input, io::BufferedInputStream::Create(reader_options.block_size,
default_memory_pool(),
std::move(input)));
- ARROW_ASSIGN_OR_RAISE(first_block, input->Peek(reader_options.block_size));
-
- const auto& parse_options = format.parse_options;
- auto convert_options = csv::ConvertOptions::Defaults();
- if (scan_options != nullptr) {
- ARROW_ASSIGN_OR_RAISE(convert_options,
- GetConvertOptions(format, scan_options, first_block,
pool));
- }
- auto maybe_reader =
- csv::StreamingReader::Make(io::IOContext(pool), std::move(input),
reader_options,
- parse_options, convert_options);
- if (!maybe_reader.ok()) {
- return maybe_reader.status().WithMessage("Could not open CSV input source
'",
- source.path(), "': ",
maybe_reader.status());
- }
+ auto peek_fut = DeferNotOk(input->io_context().executor()->Submit(
Review comment:
nit: is there any benefit in separating the futures out like this
instead of just wrapping it all in a Future?
Also, would it make sense to move `OpenCompressed` into the Future? It may
have to make a HEAD request on S3, for instance. (Though we could async-ify all
of Filesystems too, not here though.)
##########
File path: cpp/src/arrow/util/async_generator.h
##########
@@ -1029,6 +1102,65 @@ AsyncGenerator<T>
MakeConcatenatedGenerator(AsyncGenerator<AsyncGenerator<T>> so
return MergedGenerator<T>(std::move(source), 1);
}
+template <typename T>
+struct Enumerated {
+ util::optional<T> value;
+ int index;
+ bool last;
+};
+
+template <typename T>
+struct IterationTraits<Enumerated<T>> {
+ static Enumerated<T> End() { return Enumerated<T>{{}, -1, false}; }
+ static bool IsEnd(const Enumerated<T>& val) { return !val.value.has_value();
}
+};
+
+template <typename T>
+class EnumeratingGenerator {
+ public:
+ EnumeratingGenerator(AsyncGenerator<T> source, T initial_value)
+ : state_(std::make_shared<State>(std::move(source),
std::move(initial_value))) {}
+
+ Future<Enumerated<T>> operator()() {
Review comment:
Ah never mind me, it's because you have `last`.
##########
File path: cpp/src/arrow/dataset/dataset.cc
##########
@@ -105,60 +109,63 @@ Result<std::shared_ptr<ScannerBuilder>>
Dataset::NewScan() {
return NewScan(std::make_shared<ScanOptions>());
}
-Result<FragmentIterator> Dataset::GetFragments() {
+Future<FragmentVector> Dataset::GetFragmentsAsync() {
ARROW_ASSIGN_OR_RAISE(auto predicate, literal(true).Bind(*schema_));
- return GetFragments(std::move(predicate));
+ return GetFragmentsAsync(std::move(predicate));
}
-Result<FragmentIterator> Dataset::GetFragments(Expression predicate) {
+Future<FragmentVector> Dataset::GetFragmentsAsync(Expression predicate) {
ARROW_ASSIGN_OR_RAISE(
predicate, SimplifyWithGuarantee(std::move(predicate),
partition_expression_));
return predicate.IsSatisfiable() ? GetFragmentsImpl(std::move(predicate))
- :
MakeEmptyIterator<std::shared_ptr<Fragment>>();
+ : FragmentVector{};
}
-struct VectorRecordBatchGenerator : InMemoryDataset::RecordBatchGenerator {
- explicit VectorRecordBatchGenerator(RecordBatchVector batches)
+struct VectorRecordBatchVectorFactory :
InMemoryDataset::RecordBatchVectorFactory {
+ explicit VectorRecordBatchVectorFactory(RecordBatchVector batches)
: batches_(std::move(batches)) {}
- RecordBatchIterator Get() const final { return MakeVectorIterator(batches_);
}
+ Result<RecordBatchVector> Get() const final { return batches_; }
RecordBatchVector batches_;
};
InMemoryDataset::InMemoryDataset(std::shared_ptr<Schema> schema,
RecordBatchVector batches)
: Dataset(std::move(schema)),
- get_batches_(new VectorRecordBatchGenerator(std::move(batches))) {}
+ get_batches_(new VectorRecordBatchVectorFactory(std::move(batches))) {}
-struct TableRecordBatchGenerator : InMemoryDataset::RecordBatchGenerator {
- explicit TableRecordBatchGenerator(std::shared_ptr<Table> table)
+struct TableRecordBatchVectorFactory :
InMemoryDataset::RecordBatchVectorFactory {
+ explicit TableRecordBatchVectorFactory(std::shared_ptr<Table> table)
: table_(std::move(table)) {}
- RecordBatchIterator Get() const final {
+ Result<RecordBatchVector> Get() const final {
auto reader = std::make_shared<TableBatchReader>(*table_);
auto table = table_;
- return MakeFunctionIterator([reader, table] { return reader->Next(); });
+ auto iter = MakeFunctionIterator([reader, table] { return reader->Next();
});
+ return iter.ToVector();
}
std::shared_ptr<Table> table_;
};
InMemoryDataset::InMemoryDataset(std::shared_ptr<Table> table)
: Dataset(table->schema()),
- get_batches_(new TableRecordBatchGenerator(std::move(table))) {}
+ get_batches_(new TableRecordBatchVectorFactory(std::move(table))) {}
Result<std::shared_ptr<Dataset>> InMemoryDataset::ReplaceSchema(
std::shared_ptr<Schema> schema) const {
RETURN_NOT_OK(CheckProjectable(*schema_, *schema));
return std::make_shared<InMemoryDataset>(std::move(schema), get_batches_);
}
-Result<FragmentIterator> InMemoryDataset::GetFragmentsImpl(Expression) {
+Future<FragmentVector> InMemoryDataset::GetFragmentsImpl(Expression) {
auto schema = this->schema();
- auto create_fragment =
- [schema](std::shared_ptr<RecordBatch> batch) ->
Result<std::shared_ptr<Fragment>> {
+ // FIXME Need auto here
Review comment:
Instead of the FIXME, we could just rewrite this to be a direct loop,
right?
##########
File path: cpp/src/arrow/dataset/dataset_internal.h
##########
@@ -29,34 +29,40 @@
#include "arrow/record_batch.h"
#include "arrow/scalar.h"
#include "arrow/type.h"
+#include "arrow/util/future.h"
#include "arrow/util/iterator.h"
#include "arrow/util/optional.h"
+#include "arrow/util/vector.h"
namespace arrow {
namespace dataset {
/// \brief GetFragmentsFromDatasets transforms a vector<Dataset> into a
/// flattened FragmentIterator.
-inline Result<FragmentIterator> GetFragmentsFromDatasets(const DatasetVector&
datasets,
- Expression predicate)
{
- // Iterator<Dataset>
- auto datasets_it = MakeVectorIterator(datasets);
-
- // Dataset -> Iterator<Fragment>
- auto fn = [predicate](std::shared_ptr<Dataset> dataset) ->
Result<FragmentIterator> {
- return dataset->GetFragments(predicate);
+inline Future<FragmentVector> GetFragmentsFromDatasets(const DatasetVector&
datasets,
+ Expression predicate) {
+ // Dataset -> Future<FragmentVector>
+ auto fn = [predicate](std::shared_ptr<Dataset> dataset) ->
Future<FragmentVector> {
+ return dataset->GetFragmentsAsync(predicate);
};
- // Iterator<Iterator<Fragment>>
- auto fragments_it = MakeMaybeMapIterator(fn, std::move(datasets_it));
+ auto fragment_futures = internal::MapVector(fn, datasets);
- // Iterator<Fragment>
- return MakeFlattenIterator(std::move(fragments_it));
+ return All(fragment_futures)
+ .Then([](const std::vector<Result<FragmentVector>>& fragment_vecs)
+ -> Result<FragmentVector> {
+ ARROW_ASSIGN_OR_RAISE(auto unwrapped_vecs,
internal::UnwrapOrRaise(fragment_vecs))
+ return internal::FlattenVectors(std::move(unwrapped_vecs));
+ });
}
-inline RecordBatchIterator IteratorFromReader(
- const std::shared_ptr<RecordBatchReader>& reader) {
- return MakeFunctionIterator([reader] { return reader->Next(); });
+inline RecordBatchGenerator GeneratorFromReader(
Review comment:
As far as I can see this is unused
##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -166,60 +425,67 @@ Result<std::shared_ptr<Scanner>> ScannerBuilder::Finish()
{
return std::make_shared<Scanner>(dataset_, scan_options_);
}
-static inline RecordBatchVector FlattenRecordBatchVector(
- std::vector<RecordBatchVector> nested_batches) {
- RecordBatchVector flattened;
-
- for (auto& task_batches : nested_batches) {
- for (auto& batch : task_batches) {
- flattened.emplace_back(std::move(batch));
- }
- }
-
- return flattened;
-}
-
struct TableAssemblyState {
/// Protecting mutating accesses to batches
std::mutex mutex{};
- std::vector<RecordBatchVector> batches{};
+ std::vector<std::vector<RecordBatchVector>> batches{};
+ int scan_task_id = 0;
- void Emplace(RecordBatchVector b, size_t position) {
+ void Emplace(std::shared_ptr<RecordBatch> batch, size_t fragment_index,
+ size_t task_index, size_t record_batch_index) {
std::lock_guard<std::mutex> lock(mutex);
- if (batches.size() <= position) {
- batches.resize(position + 1);
+ if (batches.size() <= fragment_index) {
+ batches.resize(fragment_index + 1);
+ }
+ if (batches[fragment_index].size() <= task_index) {
+ batches[fragment_index].resize(task_index + 1);
+ }
+ if (batches[fragment_index][task_index].size() <= record_batch_index) {
+ batches[fragment_index][task_index].resize(record_batch_index + 1);
}
- batches[position] = std::move(b);
+ batches[fragment_index][task_index][record_batch_index] = std::move(batch);
}
};
+struct TaggedRecordBatch {
+ std::shared_ptr<RecordBatch> record_batch;
+};
+
Result<std::shared_ptr<Table>> Scanner::ToTable() {
- ARROW_ASSIGN_OR_RAISE(auto scan_task_it, Scan());
- auto task_group = scan_options_->TaskGroup();
+ auto table_fut = ToTableAsync();
+ table_fut.Wait();
Review comment:
I think I'm missing something, but here too I think Wait() is redundant
##########
File path: cpp/src/arrow/dataset/dataset_internal.h
##########
@@ -29,34 +29,40 @@
#include "arrow/record_batch.h"
#include "arrow/scalar.h"
#include "arrow/type.h"
+#include "arrow/util/future.h"
#include "arrow/util/iterator.h"
#include "arrow/util/optional.h"
+#include "arrow/util/vector.h"
namespace arrow {
namespace dataset {
/// \brief GetFragmentsFromDatasets transforms a vector<Dataset> into a
/// flattened FragmentIterator.
Review comment:
nit: comment is out of date
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]