bkietz commented on a change in pull request #7534:
URL: https://github.com/apache/arrow/pull/7534#discussion_r453914180



##########
File path: cpp/src/parquet/arrow/reader.cc
##########
@@ -738,68 +749,90 @@ Status GetReader(const SchemaField& field, const 
std::shared_ptr<ReaderContext>&
   END_PARQUET_CATCH_EXCEPTIONS
 }
 
-Status FileReaderImpl::GetRecordBatchReader(const std::vector<int>& 
row_group_indices,
+Status FileReaderImpl::GetRecordBatchReader(const std::vector<int>& row_groups,
                                             const std::vector<int>& 
column_indices,
                                             
std::unique_ptr<RecordBatchReader>* out) {
-  // row group indices check
-  for (int row_group_index : row_group_indices) {
-    RETURN_NOT_OK(BoundsCheckRowGroup(row_group_index));
-  }
+  RETURN_NOT_OK(BoundsCheck(row_groups, column_indices));
 
-  // column indices check
-  ARROW_ASSIGN_OR_RAISE(std::vector<int> field_indices,
-                        manifest_.GetFieldIndices(column_indices));
+  if (reader_properties_.pre_buffer()) {
+    // PARQUET-1698/PARQUET-1820: pre-buffer row groups/column chunks if 
enabled
+    BEGIN_PARQUET_CATCH_EXCEPTIONS
+    reader_->PreBuffer(row_groups, column_indices, 
reader_properties_.async_context(),
+                       reader_properties_.cache_options());
+    END_PARQUET_CATCH_EXCEPTIONS
+  }
 
+  std::vector<std::shared_ptr<ColumnReaderImpl>> readers;
   std::shared_ptr<::arrow::Schema> batch_schema;
-  RETURN_NOT_OK(GetSchema(&batch_schema));
+  RETURN_NOT_OK(GetFieldReaders(column_indices, row_groups, &readers, 
&batch_schema));
+
+  if (readers.empty()) {
+    // Just generate all batches right now; they're cheap since they have no 
columns.
+    int64_t batch_size = properties().batch_size();
+    auto max_sized_batch =
+        ::arrow::RecordBatch::Make(batch_schema, batch_size, 
::arrow::ArrayVector{});
+
+    ::arrow::RecordBatchVector batches;
+
+    for (int row_group : row_groups) {
+      int64_t num_rows = 
parquet_reader()->metadata()->RowGroup(row_group)->num_rows();
 
-  // filter to only arrow::Fields which contain the selected physical columns
-  {
-    ::arrow::FieldVector selected_fields;
+      batches.insert(batches.end(), num_rows / batch_size, max_sized_batch);
 
-    for (int field_idx : field_indices) {
-      selected_fields.push_back(batch_schema->field(field_idx));
+      if (int64_t trailing_rows = num_rows % batch_size) {
+        batches.push_back(max_sized_batch->Slice(0, trailing_rows));
+      }
     }
 
-    batch_schema = ::arrow::schema(std::move(selected_fields));
-  }
+    *out = ::arrow::internal::make_unique<RowGroupRecordBatchReader>(
+        ::arrow::MakeVectorIterator(std::move(batches)), 
std::move(batch_schema));
 
-  if (reader_properties_.pre_buffer()) {
-    // PARQUET-1698/PARQUET-1820: pre-buffer row groups/column chunks if 
enabled
-    BEGIN_PARQUET_CATCH_EXCEPTIONS
-    reader_->PreBuffer(row_group_indices, column_indices,
-                       reader_properties_.async_context(),
-                       reader_properties_.cache_options());
-    END_PARQUET_CATCH_EXCEPTIONS
+    return Status::OK();
   }
 
   using ::arrow::RecordBatchIterator;
 
-  // NB: This lambda will be invoked lazily whenever a new row group must be
-  // scanned, so it must capture `column_indices` by value (it will not
-  // otherwise outlive the scope of `GetRecordBatchReader()`). `this` is a 
non-owning
-  // pointer so we are relying on the parent FileReader outliving this 
RecordBatchReader.
-  auto row_group_index_to_batch_iterator =
-      [column_indices, this](const int* i) -> 
::arrow::Result<RecordBatchIterator> {
-    std::shared_ptr<::arrow::Table> table;
-    RETURN_NOT_OK(RowGroup(*i)->ReadTable(column_indices, &table));
-
-    auto table_reader = std::make_shared<::arrow::TableBatchReader>(*table);
-    table_reader->set_chunksize(properties().batch_size());
-
-    // NB: explicitly preserve table so that table_reader doesn't outlive it
-    return ::arrow::MakeFunctionIterator(
-        [table, table_reader] { return table_reader->Next(); });
-  };
-
-  ::arrow::Iterator<RecordBatchIterator> row_group_batches =
-      ::arrow::MakeMaybeMapIterator(
-          std::move(row_group_index_to_batch_iterator),
-          ::arrow::MakeVectorPointingIterator(row_group_indices));
+  // NB: This lambda will be invoked outside the scope of this call to
+  // `GetRecordBatchReader()`, so it must capture `readers` and `batch_schema` 
by value.
+  // `this` is a non-owning pointer so we are relying on the parent FileReader 
outliving
+  // this RecordBatchReader.
+  ::arrow::Iterator<RecordBatchIterator> batches = 
::arrow::MakeFunctionIterator(
+      [readers, batch_schema, this]() -> ::arrow::Result<RecordBatchIterator> {
+        // Get the next chunks for each column
+        // (chunks[i] contains the chunks for column i).
+        std::vector<::arrow::ArrayVector> chunks(readers.size());
+        for (size_t i = 0; i < readers.size(); ++i) {
+          std::shared_ptr<ChunkedArray> chunk;
+          do {
+            RETURN_NOT_OK(readers[i]->NextBatch(properties().batch_size(), 
&chunk));
+            if (chunk == nullptr) {
+              return ::arrow::IterationTraits<RecordBatchIterator>::End();
+            }
+          } while (chunk->length() == 0);

Review comment:
       IMHO this check should be internal to the ColumnReaders. I tried 
embedding it in TransferColumnData but discovered that other location in the 
codebase rely on empty chunks. Is this expected or should it be fixed?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to