ariel-miculas commented on code in PR #20823:
URL: https://github.com/apache/datafusion/pull/20823#discussion_r2964811770


##########
datafusion/datasource-json/src/boundary_stream.rs:
##########
@@ -0,0 +1,842 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Streaming boundary-aligned wrapper for newline-delimited JSON range reads.
+//!
+//! [`AlignedBoundaryStream`] wraps a raw byte stream and lazily aligns to
+//! record (newline) boundaries, avoiding the need for separate `get_opts`
+//! calls to locate boundary positions.
+
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use bytes::Bytes;
+use futures::stream::{BoxStream, Stream};
+use futures::{StreamExt, TryFutureExt};
+use object_store::{GetOptions, GetRange, ObjectStore};
+
+/// How far past `raw_end` the initial bounded fetch covers. If the terminating
+/// newline is not found within this window, `ScanningLastTerminator` issues
+/// successive same-sized GETs until the newline is located or EOF is reached.
+pub const END_SCAN_LOOKAHEAD: u64 = 16 * 1024; // 16 KiB
+
+/// Phase of the boundary alignment state machine.
+#[derive(Debug)]
+enum Phase {
+    /// Scanning for the first newline to align the start boundary.
+    ScanningFirstTerminator,
+    /// Passing through aligned data, tracking byte position.
+    FetchingChunks,
+    /// Past the end boundary, scanning for terminating newline.
+    ScanningLastTerminator,
+    /// Stream is exhausted.
+    Done,
+}
+
+/// A stream wrapper that lazily aligns byte boundaries to newline characters.
+///
+/// Given a raw byte stream starting from `fetch_start` (which is `start - 1`
+/// for non-zero starts, or `0`), this stream:
+///
+/// 1. Skips bytes until the first newline is found (start alignment)
+/// 2. Passes through data until the `end` boundary is reached
+/// 3. Continues past `end` to find the terminating newline (end alignment)
+///
+/// When the initial byte stream is exhausted during step 3 and the file has
+/// not been fully read, `ScanningLastTerminator` issues additional bounded
+/// `get_opts` calls (`END_SCAN_LOOKAHEAD` bytes each) until the newline is
+/// found or EOF is reached.
+pub struct AlignedBoundaryStream {
+    inner: BoxStream<'static, object_store::Result<Bytes>>,
+    terminator: u8,
+    /// Effective end boundary. Set to `u64::MAX` when `end >= file_size`
+    /// (last partition), so `FetchingChunks` never transitions to
+    /// `ScanningLastTerminator` and simply streams to EOF.
+    end: u64,
+    /// Cumulative bytes consumed from `inner` (relative to `fetch_start`).
+    bytes_consumed: u64,
+    /// The offset where the current `inner` stream begins.
+    fetch_start: u64,
+    phase: Phase,
+    /// Remainder bytes from `ScanningFirstTerminator` that still need
+    /// end-boundary processing. Consumed by `FetchingChunks` before polling
+    /// `inner`.
+    pending: Option<Bytes>,
+    store: Arc<dyn ObjectStore>,
+    location: object_store::path::Path,
+    /// Total file size; overflow stops when `abs_pos() >= file_size`.
+    file_size: u64,
+}
+
+/// Fetch a bounded byte range from `store` and return it as a stream
+async fn get_stream(
+    store: Arc<dyn ObjectStore>,
+    location: object_store::path::Path,
+    range: std::ops::Range<u64>,
+) -> object_store::Result<BoxStream<'static, object_store::Result<Bytes>>> {
+    let opts = GetOptions {
+        range: Some(GetRange::Bounded(range.clone())),
+        ..Default::default()
+    };
+    let result = store.get_opts(&location, opts).await?;
+    Ok(result.into_stream())
+}
+
+impl AlignedBoundaryStream {
+    /// Open a ranged byte stream from `store` and return a ready-to-poll
+    /// `AlignedBoundaryStream`.
+    ///
+    /// Issues a single bounded `get_opts` call covering
+    /// `[fetch_start, raw_end + END_SCAN_LOOKAHEAD)`.  If the terminating
+    /// newline is not found within that window, `ScanningLastTerminator`
+    /// automatically issues additional `END_SCAN_LOOKAHEAD`-sized GETs
+    /// via `store` until the newline is found or EOF is reached.
+    pub async fn new(
+        store: Arc<dyn ObjectStore>,
+        location: object_store::path::Path,
+        raw_start: u64,
+        raw_end: u64,
+        file_size: u64,
+        terminator: u8,
+    ) -> object_store::Result<Self> {
+        if raw_start >= raw_end {

Review Comment:
   yeah, I also removed them from the caller, so we don't have duplication



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to