miguelpragier opened a new pull request, #40090:
URL: https://github.com/apache/arrow/pull/40090
### Rationale for this change
Enabling Support for Large Recordsets in Go FlightSQL Driver
Replacing **download-all-at-once->read-later** with
**download-chunk-as-reading** approach.
The primary motivation for these changes is to enhance the driver's
capability to handle large recordsets without the need for unnecessary memory
pre-allocations. By implementing a concurrent streaming approach, the driver
avoids loading the entire dataset into memory at once.
### Description:
Implementing Concurrent Record Streaming to Better Support the Handling of
Large Recordsets.
For retrieving a recordset, the current implementation works as follows:
- An SQL query results in a set of [endpoints] and a query ticket.
- Each [endpoint] is requested (with the generated ticket), and its response
is a [reader].
- Each reader is iterated for records. These records are, in fact, arrays of
rows.
- All the retrieved rows are stored at once in an array.
- This means that data, potentially comprising billions of rows, is
synchronously read into an array.
- After this array is filled, it is then returned, all at once, to the
consumer.
- This can result in out-of-memory failures, or at the very least,
unnecessary waiting times and huge pre-allocations.
### Proposed Changes:
Iterate over [endpoints], [readers], and [records] ad hoc, reading only the
necessary data according to consumer demand.
### What changes are included in this PR?
**1. Introduction of `sync.Mutex`:**
- The `Rows` struct has been updated to include a `currentRecordMux`
mutex. This addition ensures that operations involving the release of the
current record are thread-safe, thus preventing potential race conditions in a
concurrent environment.
**2. Channels for Asynchronous Record Fetching:**
- A new buffered channel, `recordChan`, has been added to the `Rows`
struct. This channel permits the driver to asynchronously fetch and queue
records. It provides a non-blocking mechanism to manage incoming records, which
is particularly advantageous when dealing with large recordsets.
**3. Asynchronous Record Streaming via Goroutines:**
- The `streamRecordset` function has been introduced and is designed to
run concurrently using goroutines. This modification permits the driver to
begin processing records as soon as they are received, without having to wait
for the entire recordset to be loaded into memory.
**4. Improved Record Management:**
- A new method, `releaseRecord`, has been created to manage the lifecycle
of the current record. This method ensures that resources are released when a
record is no longer needed, thus reducing the memory footprint when processing
large datasets.
**5. Refactoring of the `Next` Method:**
- The `Next` method in the `Rows` struct has been refactored to suit the
new streaming model. It now efficiently waits for and retrieves the next
available record from the `recordChan` channel, enabling a smooth and
memory-efficient iteration over large datasets.
### Are These Changes Tested?
The proposed changes have been validated against existing tests.
### Are There Any User-Facing Changes?
No, there are no user-facing changes.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]