bkietz commented on a change in pull request #7000: URL: https://github.com/apache/arrow/pull/7000#discussion_r412252930
########## File path: cpp/src/arrow/dataset/dataset.h ########## @@ -84,13 +82,12 @@ class ARROW_DS_EXPORT Fragment { class ARROW_DS_EXPORT InMemoryFragment : public Fragment { public: InMemoryFragment(RecordBatchVector record_batches, - std::shared_ptr<ScanOptions> scan_options); + std::shared_ptr<Expression> = NULLPTR); Review comment: `scalar` is declared such that you can use `scalar(true)` for default arguments ```suggestion std::shared_ptr<Expression> = scalar(true)); ``` ########## File path: cpp/src/arrow/dataset/dataset.h ########## @@ -100,16 +97,20 @@ class ARROW_DS_EXPORT InMemoryFragment : public Fragment { RecordBatchVector record_batches_; }; -/// \brief A container of zero or more Fragments. A Dataset acts as a discovery mechanism -/// of Fragments and partitions, e.g. files deeply nested in a directory. +/// \brief A container of zero or more Fragments. +/// +/// A Dataset acts as a union of Fragments, e.g. files deeply nested in a +/// directory. A Dataset has a schema, also known as the "reader" schema. Review comment: Is this known as the "reader" schema outside Avro space? If not I think it's still acceptable to explain this by analogy to Avro, but we should be explicit whose jargon we're borrowing ########## File path: cpp/src/arrow/dataset/dataset.h ########## @@ -100,16 +97,20 @@ class ARROW_DS_EXPORT InMemoryFragment : public Fragment { RecordBatchVector record_batches_; }; -/// \brief A container of zero or more Fragments. A Dataset acts as a discovery mechanism -/// of Fragments and partitions, e.g. files deeply nested in a directory. +/// \brief A container of zero or more Fragments. +/// +/// A Dataset acts as a union of Fragments, e.g. files deeply nested in a +/// directory. A Dataset has a schema, also known as the "reader" schema. +/// class ARROW_DS_EXPORT Dataset : public std::enable_shared_from_this<Dataset> { public: /// \brief Begin to build a new Scan operation against this Dataset Result<std::shared_ptr<ScannerBuilder>> NewScan(std::shared_ptr<ScanContext> context); Result<std::shared_ptr<ScannerBuilder>> NewScan(); - /// \brief GetFragments returns an iterator of Fragments given ScanOptions. - FragmentIterator GetFragments(std::shared_ptr<ScanOptions> options); + /// \brief GetFragments returns an iterator of Fragments given a predicate. + FragmentIterator GetFragments(std::shared_ptr<Expression> predicate); + FragmentIterator GetFragments(); Review comment: ```suggestion FragmentIterator GetFragments(std::shared_ptr<Expression> predicate = scalar(true)); ``` ########## File path: cpp/src/arrow/dataset/dataset.cc ########## @@ -30,34 +30,40 @@ namespace arrow { namespace dataset { -Fragment::Fragment(std::shared_ptr<ScanOptions> scan_options) - : scan_options_(std::move(scan_options)), partition_expression_(scalar(true)) {} +Fragment::Fragment(std::shared_ptr<Expression> partition_expression) + : partition_expression_(partition_expression ? partition_expression : scalar(true)) {} -const std::shared_ptr<Schema>& Fragment::schema() const { - return scan_options_->schema(); -} +Result<std::shared_ptr<Schema>> InMemoryFragment::ReadPhysicalSchema() { Review comment: Probably follow up: `InMemoryFragment` should be constructed with an explicit schema with convenience constructor to extract batches[0]->schema that fails if batches are empty. ########## File path: cpp/src/arrow/dataset/dataset.cc ########## @@ -72,36 +78,15 @@ Result<std::shared_ptr<ScannerBuilder>> Dataset::NewScan() { return NewScan(std::make_shared<ScanContext>()); } -bool Dataset::AssumePartitionExpression( - const std::shared_ptr<ScanOptions>& scan_options, - std::shared_ptr<ScanOptions>* simplified_scan_options) const { - if (partition_expression_ == nullptr) { - if (simplified_scan_options != nullptr) { - *simplified_scan_options = scan_options; - } - return true; - } +FragmentIterator Dataset::GetFragments() { return GetFragments(scalar(true)); } - auto expr = scan_options->filter->Assume(*partition_expression_); - if (expr->IsNull() || expr->Equals(false)) { - // selector is not satisfiable; yield no fragments - return false; +FragmentIterator Dataset::GetFragments(std::shared_ptr<Expression> predicate) { + if (partition_expression_) { Review comment: Isn't this initialized to `scalar(true)`? ########## File path: cpp/src/arrow/dataset/filter.cc ########## @@ -655,31 +655,32 @@ std::string ScalarExpression::ToString() const { return value_->ToString() + ":" + type_repr; } +using arrow::internal::JoinStrings; Review comment: :+1: ########## File path: cpp/src/arrow/dataset/file_parquet_test.cc ########## @@ -470,8 +466,8 @@ TEST_F(TestParquetFileFormat, PredicatePushdownRowGroupFragments) { CountRowGroupsInFragment(fragment, internal::Iota(5, static_cast<int>(kNumRowGroups)), "i64"_ >= int64_t(6)); - CountRowGroupsInFragment(fragment, {5, 6, 7}, "i64"_ >= int64_t(6), - "i64"_ < int64_t(8)); + CountRowGroupsInFragment(fragment, {5, 6, 7}, + "i64"_ >= int64_t(6) && "i64"_ < int64_t(8)); Review comment: consistency nit ```suggestion "i64"_ >= int64_t(6) and "i64"_ < int64_t(8)); ``` ########## File path: cpp/src/arrow/dataset/dataset.h ########## @@ -30,12 +30,21 @@ namespace arrow { namespace dataset { -/// \brief A granular piece of a Dataset, such as an individual file, which can be -/// read/scanned separately from other fragments. +/// \brief A granular piece of a Dataset, such as an individual file. /// -/// A Fragment yields a collection of RecordBatch, encapsulated in one or more ScanTasks. +/// A Fragment can be read/scanned separately from other fragments. It yields a +/// collection of RecordBatch, encapsulated in one or more ScanTasks. +/// +/// A notable difference from Dataset is that Fragments have physical schemas +/// which may differ from Fragments. Review comment: ```suggestion /// Note that Fragments have well defined physical schemas which are reconciled by /// the Datasets which contain them; these physical schemas may differ from a parent /// Dataset's schema and the physical schemas of sibling Fragments. ``` ########## File path: cpp/src/arrow/dataset/partition.cc ########## @@ -444,10 +432,9 @@ struct DirectoryPartitioningFactory::MakeWritePlanImpl { ARROW_ASSIGN_OR_RAISE(out.partitioning, this_->Finish(std::move(partitioning_schema))); - auto fragment_schema = - source_fragments_.empty() ? schema({}) : source_fragments_.front()->schema(); + // There's no guarantee that all Fragments have the same schema. ARROW_ASSIGN_OR_RAISE(out.schema, - UnifySchemas({out.partitioning->schema(), fragment_schema})); + UnifySchemas({out.partitioning->schema(), schema_})); Review comment: This doesn't drop fields implicit in `out.partitioning`, which means we'll be writing redundant information. This should be handled by removing `scan_options` from the parameters of `FileDataset::Write`, see following comment ########## File path: python/pyarrow/_dataset.pyx ########## @@ -104,27 +104,45 @@ cdef class Dataset: self.dataset.ReplaceSchema(pyarrow_unwrap_schema(schema))) return Dataset.wrap(move(copy)) - def get_fragments(self, columns=None, filter=None): + def get_fragments(self, Expression filter=None): """Returns an iterator over the fragments in this dataset. Parameters ---------- - columns : list of str, default None - List of columns to project. filter : Expression, default None - Scan will return only the rows matching the filter. + Return fragments matching the optional filter, either using the + partition_expression or internal information like Parquet's + statistics. Review comment: ```suggestion Return fragments matching the optional filter, using explicit partition expressions and/or embedded information like Parquet's statistics. ``` ########## File path: cpp/src/arrow/dataset/scanner_internal.h ########## @@ -55,28 +56,55 @@ inline RecordBatchIterator ProjectRecordBatch(RecordBatchIterator it, class FilterAndProjectScanTask : public ScanTask { public: - explicit FilterAndProjectScanTask(std::shared_ptr<ScanTask> task) - : ScanTask(task->options(), task->context()), task_(std::move(task)) {} + explicit FilterAndProjectScanTask(std::shared_ptr<ScanTask> task, + std::shared_ptr<Expression> partition) + : ScanTask(task->options(), task->context()), + task_(std::move(task)), + partition_(std::move(partition)), + filter_(NULLPTR), Review comment: Slight tangent: why aren't `*_internal.h` excluded in lint_cpp_cli.py? It seems they should be ########## File path: cpp/src/arrow/dataset/filter.h ########## @@ -183,6 +183,11 @@ class ARROW_DS_EXPORT Expression { return Assume(*given); } + /// Indicates if the expression is satisfiable. + /// + /// This is a shortcut to check if the expression is neither null nor false. + bool IsSatisfiable() const { return !IsNull() && !Equals(false); } Review comment: :+1: ########## File path: python/pyarrow/_dataset.pyx ########## @@ -104,27 +104,45 @@ cdef class Dataset: self.dataset.ReplaceSchema(pyarrow_unwrap_schema(schema))) return Dataset.wrap(move(copy)) - def get_fragments(self, columns=None, filter=None): + def get_fragments(self, Expression filter=None): """Returns an iterator over the fragments in this dataset. Parameters ---------- - columns : list of str, default None - List of columns to project. filter : Expression, default None - Scan will return only the rows matching the filter. + Return fragments matching the optional filter, either using the + partition_expression or internal information like Parquet's + statistics. Returns ------- fragments : iterator of Fragment """ - return Scanner(self, columns=columns, filter=filter).get_fragments() + cdef: + CFragmentIterator iterator + shared_ptr[CFragment] fragment - def scan(self, columns=None, filter=None, MemoryPool memory_pool=None): + if filter is None or filter.expr == nullptr: + iterator = self.dataset.GetFragments() + else: + iterator = self.dataset.GetFragments(filter.unwrap()) + + while True: + fragment = GetResultValue(iterator.Next()) + if fragment.get() == nullptr: + raise StopIteration() + else: + yield Fragment.wrap(fragment) + + def _scanner(self, **kwargs): + return Scanner.from_dataset(self, **kwargs) + + def scan(self, columns=None, filter=None, + MemoryPool memory_pool=None, **kwargs): """Builds a scan operation against the dataset. - It poduces a stream of ScanTasks which is meant to be a unit of work to - be dispatched. The tasks are not executed automatically, the user is + It produces a stream of ScanTasks which is meant to be a unit of work + to be dispatched. The tasks are not executed automatically, the user is responsible to execute and dispatch the individual tasks, so custom local task scheduling can be implemented. Review comment: ```suggestion It produces a stream of ScanTasks. Each task is meant to be a unit of work to be dispatched by the user; they are not executed automatically. This allows customization of local task scheduling and execution. ``` ########## File path: python/pyarrow/_dataset.pyx ########## @@ -104,27 +104,45 @@ cdef class Dataset: self.dataset.ReplaceSchema(pyarrow_unwrap_schema(schema))) return Dataset.wrap(move(copy)) - def get_fragments(self, columns=None, filter=None): + def get_fragments(self, Expression filter=None): """Returns an iterator over the fragments in this dataset. Parameters ---------- - columns : list of str, default None - List of columns to project. filter : Expression, default None - Scan will return only the rows matching the filter. + Return fragments matching the optional filter, either using the + partition_expression or internal information like Parquet's + statistics. Returns ------- fragments : iterator of Fragment """ - return Scanner(self, columns=columns, filter=filter).get_fragments() + cdef: + CFragmentIterator iterator + shared_ptr[CFragment] fragment - def scan(self, columns=None, filter=None, MemoryPool memory_pool=None): + if filter is None or filter.expr == nullptr: Review comment: This coercion is common, should we have `_unwrap_expression_default_true()`? ########## File path: cpp/src/arrow/dataset/file_base.cc ########## @@ -222,9 +214,8 @@ FragmentIterator FileSystemDataset::GetFragmentsImpl( } Result<std::shared_ptr<FileSystemDataset>> FileSystemDataset::Write( - const WritePlan& plan, std::shared_ptr<ScanContext> scan_context) { - std::vector<std::shared_ptr<ScanOptions>> options(plan.paths.size()); - + const WritePlan& plan, std::shared_ptr<ScanOptions> scan_options, Review comment: ```suggestion const WritePlan& plan, ``` Instead, `scan_options` should be derived from `plan.{schema,partitioning}` to project out columns implicit in the partitioning ########## File path: cpp/src/arrow/dataset/dataset.h ########## @@ -30,12 +30,21 @@ namespace arrow { namespace dataset { -/// \brief A granular piece of a Dataset, such as an individual file, which can be -/// read/scanned separately from other fragments. +/// \brief A granular piece of a Dataset, such as an individual file. /// -/// A Fragment yields a collection of RecordBatch, encapsulated in one or more ScanTasks. +/// A Fragment can be read/scanned separately from other fragments. It yields a +/// collection of RecordBatch, encapsulated in one or more ScanTasks. Review comment: ```suggestion /// A Fragment can be read/scanned separately from other fragments. It yields a /// collection of RecordBatches when scanned, encapsulated in one or more ScanTasks. ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org