bkietz commented on a change in pull request #7000:
URL: https://github.com/apache/arrow/pull/7000#discussion_r412252930



##########
File path: cpp/src/arrow/dataset/dataset.h
##########
@@ -84,13 +82,12 @@ class ARROW_DS_EXPORT Fragment {
 class ARROW_DS_EXPORT InMemoryFragment : public Fragment {
  public:
   InMemoryFragment(RecordBatchVector record_batches,
-                   std::shared_ptr<ScanOptions> scan_options);
+                   std::shared_ptr<Expression> = NULLPTR);

Review comment:
       `scalar` is declared such that you can use `scalar(true)` for default 
arguments
   ```suggestion
                      std::shared_ptr<Expression> = scalar(true));
   ```

##########
File path: cpp/src/arrow/dataset/dataset.h
##########
@@ -100,16 +97,20 @@ class ARROW_DS_EXPORT InMemoryFragment : public Fragment {
   RecordBatchVector record_batches_;
 };
 
-/// \brief A container of zero or more Fragments. A Dataset acts as a 
discovery mechanism
-/// of Fragments and partitions, e.g. files deeply nested in a directory.
+/// \brief A container of zero or more Fragments.
+///
+/// A Dataset acts as a union of Fragments, e.g. files deeply nested in a
+/// directory. A Dataset has a schema, also known as the "reader" schema.

Review comment:
       Is this known as the "reader" schema outside Avro space? If not I think 
it's still acceptable to explain this by analogy to Avro, but we should be 
explicit whose jargon we're borrowing

##########
File path: cpp/src/arrow/dataset/dataset.h
##########
@@ -100,16 +97,20 @@ class ARROW_DS_EXPORT InMemoryFragment : public Fragment {
   RecordBatchVector record_batches_;
 };
 
-/// \brief A container of zero or more Fragments. A Dataset acts as a 
discovery mechanism
-/// of Fragments and partitions, e.g. files deeply nested in a directory.
+/// \brief A container of zero or more Fragments.
+///
+/// A Dataset acts as a union of Fragments, e.g. files deeply nested in a
+/// directory. A Dataset has a schema, also known as the "reader" schema.
+///
 class ARROW_DS_EXPORT Dataset : public std::enable_shared_from_this<Dataset> {
  public:
   /// \brief Begin to build a new Scan operation against this Dataset
   Result<std::shared_ptr<ScannerBuilder>> NewScan(std::shared_ptr<ScanContext> 
context);
   Result<std::shared_ptr<ScannerBuilder>> NewScan();
 
-  /// \brief GetFragments returns an iterator of Fragments given ScanOptions.
-  FragmentIterator GetFragments(std::shared_ptr<ScanOptions> options);
+  /// \brief GetFragments returns an iterator of Fragments given a predicate.
+  FragmentIterator GetFragments(std::shared_ptr<Expression> predicate);
+  FragmentIterator GetFragments();

Review comment:
       ```suggestion
     FragmentIterator GetFragments(std::shared_ptr<Expression> predicate = 
scalar(true));
   ```

##########
File path: cpp/src/arrow/dataset/dataset.cc
##########
@@ -30,34 +30,40 @@
 namespace arrow {
 namespace dataset {
 
-Fragment::Fragment(std::shared_ptr<ScanOptions> scan_options)
-    : scan_options_(std::move(scan_options)), 
partition_expression_(scalar(true)) {}
+Fragment::Fragment(std::shared_ptr<Expression> partition_expression)
+    : partition_expression_(partition_expression ? partition_expression : 
scalar(true)) {}
 
-const std::shared_ptr<Schema>& Fragment::schema() const {
-  return scan_options_->schema();
-}
+Result<std::shared_ptr<Schema>> InMemoryFragment::ReadPhysicalSchema() {

Review comment:
       Probably follow up: `InMemoryFragment` should be constructed with an 
explicit schema with convenience constructor to extract batches[0]->schema that 
fails if batches are empty.

##########
File path: cpp/src/arrow/dataset/dataset.cc
##########
@@ -72,36 +78,15 @@ Result<std::shared_ptr<ScannerBuilder>> Dataset::NewScan() {
   return NewScan(std::make_shared<ScanContext>());
 }
 
-bool Dataset::AssumePartitionExpression(
-    const std::shared_ptr<ScanOptions>& scan_options,
-    std::shared_ptr<ScanOptions>* simplified_scan_options) const {
-  if (partition_expression_ == nullptr) {
-    if (simplified_scan_options != nullptr) {
-      *simplified_scan_options = scan_options;
-    }
-    return true;
-  }
+FragmentIterator Dataset::GetFragments() { return GetFragments(scalar(true)); }
 
-  auto expr = scan_options->filter->Assume(*partition_expression_);
-  if (expr->IsNull() || expr->Equals(false)) {
-    // selector is not satisfiable; yield no fragments
-    return false;
+FragmentIterator Dataset::GetFragments(std::shared_ptr<Expression> predicate) {
+  if (partition_expression_) {

Review comment:
       Isn't this initialized to `scalar(true)`?

##########
File path: cpp/src/arrow/dataset/filter.cc
##########
@@ -655,31 +655,32 @@ std::string ScalarExpression::ToString() const {
   return value_->ToString() + ":" + type_repr;
 }
 
+using arrow::internal::JoinStrings;

Review comment:
       :+1: 

##########
File path: cpp/src/arrow/dataset/file_parquet_test.cc
##########
@@ -470,8 +466,8 @@ TEST_F(TestParquetFileFormat, 
PredicatePushdownRowGroupFragments) {
   CountRowGroupsInFragment(fragment, internal::Iota(5, 
static_cast<int>(kNumRowGroups)),
                            "i64"_ >= int64_t(6));
 
-  CountRowGroupsInFragment(fragment, {5, 6, 7}, "i64"_ >= int64_t(6),
-                           "i64"_ < int64_t(8));
+  CountRowGroupsInFragment(fragment, {5, 6, 7},
+                           "i64"_ >= int64_t(6) && "i64"_ < int64_t(8));

Review comment:
       consistency nit
   ```suggestion
                              "i64"_ >= int64_t(6) and "i64"_ < int64_t(8));
   ```

##########
File path: cpp/src/arrow/dataset/dataset.h
##########
@@ -30,12 +30,21 @@
 namespace arrow {
 namespace dataset {
 
-/// \brief A granular piece of a Dataset, such as an individual file, which 
can be
-/// read/scanned separately from other fragments.
+/// \brief A granular piece of a Dataset, such as an individual file.
 ///
-/// A Fragment yields a collection of RecordBatch, encapsulated in one or more 
ScanTasks.
+/// A Fragment can be read/scanned separately from other fragments. It yields a
+/// collection of RecordBatch, encapsulated in one or more ScanTasks.
+///
+/// A notable difference from Dataset is that Fragments have physical schemas
+/// which may differ from Fragments.

Review comment:
       ```suggestion
   /// Note that Fragments have well defined physical schemas which are 
reconciled by
   /// the Datasets which contain them; these physical schemas may differ from 
a parent
   /// Dataset's schema and the physical schemas of sibling Fragments.
   ```

##########
File path: cpp/src/arrow/dataset/partition.cc
##########
@@ -444,10 +432,9 @@ struct DirectoryPartitioningFactory::MakeWritePlanImpl {
     ARROW_ASSIGN_OR_RAISE(out.partitioning,
                           this_->Finish(std::move(partitioning_schema)));
 
-    auto fragment_schema =
-        source_fragments_.empty() ? schema({}) : 
source_fragments_.front()->schema();
+    // There's no guarantee that all Fragments have the same schema.
     ARROW_ASSIGN_OR_RAISE(out.schema,
-                          UnifySchemas({out.partitioning->schema(), 
fragment_schema}));
+                          UnifySchemas({out.partitioning->schema(), schema_}));

Review comment:
       This doesn't drop fields implicit in `out.partitioning`, which means 
we'll be writing redundant information. This should be handled by removing 
`scan_options` from the parameters of `FileDataset::Write`, see following 
comment

##########
File path: python/pyarrow/_dataset.pyx
##########
@@ -104,27 +104,45 @@ cdef class Dataset:
             self.dataset.ReplaceSchema(pyarrow_unwrap_schema(schema)))
         return Dataset.wrap(move(copy))
 
-    def get_fragments(self, columns=None, filter=None):
+    def get_fragments(self, Expression filter=None):
         """Returns an iterator over the fragments in this dataset.
 
         Parameters
         ----------
-        columns : list of str, default None
-            List of columns to project.
         filter : Expression, default None
-            Scan will return only the rows matching the filter.
+            Return fragments matching the optional filter, either using the
+            partition_expression or internal information like Parquet's
+            statistics.

Review comment:
       ```suggestion
               Return fragments matching the optional filter, using explicit
               partition expressions and/or embedded information like Parquet's
               statistics.
   ```

##########
File path: cpp/src/arrow/dataset/scanner_internal.h
##########
@@ -55,28 +56,55 @@ inline RecordBatchIterator 
ProjectRecordBatch(RecordBatchIterator it,
 
 class FilterAndProjectScanTask : public ScanTask {
  public:
-  explicit FilterAndProjectScanTask(std::shared_ptr<ScanTask> task)
-      : ScanTask(task->options(), task->context()), task_(std::move(task)) {}
+  explicit FilterAndProjectScanTask(std::shared_ptr<ScanTask> task,
+                                    std::shared_ptr<Expression> partition)
+      : ScanTask(task->options(), task->context()),
+        task_(std::move(task)),
+        partition_(std::move(partition)),
+        filter_(NULLPTR),

Review comment:
       Slight tangent: why aren't `*_internal.h` excluded in lint_cpp_cli.py? 
It seems they should be

##########
File path: cpp/src/arrow/dataset/filter.h
##########
@@ -183,6 +183,11 @@ class ARROW_DS_EXPORT Expression {
     return Assume(*given);
   }
 
+  /// Indicates if the expression is satisfiable.
+  ///
+  /// This is a shortcut to check if the expression is neither null nor false.
+  bool IsSatisfiable() const { return !IsNull() && !Equals(false); }

Review comment:
       :+1: 

##########
File path: python/pyarrow/_dataset.pyx
##########
@@ -104,27 +104,45 @@ cdef class Dataset:
             self.dataset.ReplaceSchema(pyarrow_unwrap_schema(schema)))
         return Dataset.wrap(move(copy))
 
-    def get_fragments(self, columns=None, filter=None):
+    def get_fragments(self, Expression filter=None):
         """Returns an iterator over the fragments in this dataset.
 
         Parameters
         ----------
-        columns : list of str, default None
-            List of columns to project.
         filter : Expression, default None
-            Scan will return only the rows matching the filter.
+            Return fragments matching the optional filter, either using the
+            partition_expression or internal information like Parquet's
+            statistics.
 
         Returns
         -------
         fragments : iterator of Fragment
         """
-        return Scanner(self, columns=columns, filter=filter).get_fragments()
+        cdef:
+            CFragmentIterator iterator
+            shared_ptr[CFragment] fragment
 
-    def scan(self, columns=None, filter=None, MemoryPool memory_pool=None):
+        if filter is None or filter.expr == nullptr:
+            iterator = self.dataset.GetFragments()
+        else:
+            iterator = self.dataset.GetFragments(filter.unwrap())
+
+        while True:
+            fragment = GetResultValue(iterator.Next())
+            if fragment.get() == nullptr:
+                raise StopIteration()
+            else:
+                yield Fragment.wrap(fragment)
+
+    def _scanner(self, **kwargs):
+        return Scanner.from_dataset(self, **kwargs)
+
+    def scan(self, columns=None, filter=None,
+             MemoryPool memory_pool=None, **kwargs):
         """Builds a scan operation against the dataset.
 
-        It poduces a stream of ScanTasks which is meant to be a unit of work to
-        be dispatched. The tasks are not executed automatically, the user is
+        It produces a stream of ScanTasks which is meant to be a unit of work
+        to be dispatched. The tasks are not executed automatically, the user is
         responsible to execute and dispatch the individual tasks, so custom
         local task scheduling can be implemented.

Review comment:
       ```suggestion
           It produces a stream of ScanTasks. Each task is meant to be a unit 
of work
           to be dispatched by the user; they are not executed automatically. 
This allows
           customization of local task scheduling and execution.
   ```

##########
File path: python/pyarrow/_dataset.pyx
##########
@@ -104,27 +104,45 @@ cdef class Dataset:
             self.dataset.ReplaceSchema(pyarrow_unwrap_schema(schema)))
         return Dataset.wrap(move(copy))
 
-    def get_fragments(self, columns=None, filter=None):
+    def get_fragments(self, Expression filter=None):
         """Returns an iterator over the fragments in this dataset.
 
         Parameters
         ----------
-        columns : list of str, default None
-            List of columns to project.
         filter : Expression, default None
-            Scan will return only the rows matching the filter.
+            Return fragments matching the optional filter, either using the
+            partition_expression or internal information like Parquet's
+            statistics.
 
         Returns
         -------
         fragments : iterator of Fragment
         """
-        return Scanner(self, columns=columns, filter=filter).get_fragments()
+        cdef:
+            CFragmentIterator iterator
+            shared_ptr[CFragment] fragment
 
-    def scan(self, columns=None, filter=None, MemoryPool memory_pool=None):
+        if filter is None or filter.expr == nullptr:

Review comment:
       This coercion is common, should we have 
`_unwrap_expression_default_true()`?

##########
File path: cpp/src/arrow/dataset/file_base.cc
##########
@@ -222,9 +214,8 @@ FragmentIterator FileSystemDataset::GetFragmentsImpl(
 }
 
 Result<std::shared_ptr<FileSystemDataset>> FileSystemDataset::Write(
-    const WritePlan& plan, std::shared_ptr<ScanContext> scan_context) {
-  std::vector<std::shared_ptr<ScanOptions>> options(plan.paths.size());
-
+    const WritePlan& plan, std::shared_ptr<ScanOptions> scan_options,

Review comment:
       ```suggestion
       const WritePlan& plan,
   ```
   
   Instead, `scan_options` should be derived from `plan.{schema,partitioning}` 
to project out columns implicit in the partitioning

##########
File path: cpp/src/arrow/dataset/dataset.h
##########
@@ -30,12 +30,21 @@
 namespace arrow {
 namespace dataset {
 
-/// \brief A granular piece of a Dataset, such as an individual file, which 
can be
-/// read/scanned separately from other fragments.
+/// \brief A granular piece of a Dataset, such as an individual file.
 ///
-/// A Fragment yields a collection of RecordBatch, encapsulated in one or more 
ScanTasks.
+/// A Fragment can be read/scanned separately from other fragments. It yields a
+/// collection of RecordBatch, encapsulated in one or more ScanTasks.

Review comment:
       ```suggestion
   /// A Fragment can be read/scanned separately from other fragments. It 
yields a
   /// collection of RecordBatches when scanned, encapsulated in one or more 
ScanTasks.
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to