jdcasale commented on code in PR #18457:
URL: https://github.com/apache/datafusion/pull/18457#discussion_r2490878586
##########
datafusion/datasource-arrow/src/file_format.rs:
##########
@@ -175,10 +182,39 @@ impl FileFormat for ArrowFormat {
async fn create_physical_plan(
&self,
- _state: &dyn Session,
+ state: &dyn Session,
conf: FileScanConfig,
) -> Result<Arc<dyn ExecutionPlan>> {
- let source = Arc::new(ArrowSource::default());
+ let is_stream_format = if let Some(first_group) =
conf.file_groups.first() {
Review Comment:
Maybe worth pulling this out into a helper method that's easy to test. Also
then this method reads a bit cleaner, with just a is_stream_format() check as
opposed to this block of logic which is not directly relevant to creating a
physical plan.
##########
datafusion/datasource-arrow/src/source.rs:
##########
@@ -116,13 +117,150 @@ impl FileSource for ArrowSource {
}
}
-/// The struct arrow that implements `[FileOpener]` trait
-pub struct ArrowOpener {
+/// Arrow IPC Stream format source - supports only sequential reading
+#[derive(Clone, Default)]
+pub struct ArrowStreamSource {
+ metrics: ExecutionPlanMetricsSet,
+ projected_statistics: Option<Statistics>,
+ schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
+}
+
+impl From<ArrowStreamSource> for Arc<dyn FileSource> {
+ fn from(source: ArrowStreamSource) -> Self {
+ as_file_source(source)
+ }
+}
+
+impl FileSource for ArrowStreamSource {
+ fn create_file_opener(
+ &self,
+ object_store: Arc<dyn ObjectStore>,
+ base_config: &FileScanConfig,
+ _partition: usize,
+ ) -> Arc<dyn FileOpener> {
+ Arc::new(ArrowStreamOpener {
+ object_store,
+ projection: base_config.file_column_projection_indices(),
+ })
+ }
+
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn with_batch_size(&self, _batch_size: usize) -> Arc<dyn FileSource> {
+ Arc::new(Self { ..self.clone() })
+ }
+
+ fn with_schema(&self, _schema: TableSchema) -> Arc<dyn FileSource> {
+ Arc::new(Self { ..self.clone() })
+ }
+
+ fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {
+ let mut conf = self.clone();
+ conf.projected_statistics = Some(statistics);
+ Arc::new(conf)
+ }
+
+ fn with_projection(&self, _config: &FileScanConfig) -> Arc<dyn FileSource>
{
+ Arc::new(Self { ..self.clone() })
+ }
+
+ fn repartitioned(
+ &self,
+ _target_partitions: usize,
+ _repartition_file_min_size: usize,
+ _output_ordering:
Option<datafusion_physical_expr_common::sort_expr::LexOrdering>,
+ _config: &FileScanConfig,
+ ) -> Result<Option<FileScanConfig>> {
+ // Stream format doesn't support range-based parallel reading
+ // because it lacks a footer that would be needed to make range-based
+ // seeking practical. Without that, you would either need to read
+ // the entire file and index it up front before doing parallel reading
+ // or else each partition would need to read the entire file up to the
+ // correct offset which is a lot of duplicate I/O. We're opting to
avoid
+ // that entirely by only acting on a single partition and reading
sequentially.
+ Ok(None)
+ }
Review Comment:
I'd argue that while this problem is worth solving, doing so is tangent to
this change.
I'd like to see this solved, but I see no reason why we couldn't solve this
in a follow-on.
Probably worth documenting the practical consequences of leaving it in this
state though -- correct me if I'm wrong here, but I think this means that we
end up hydrating the entire file into memory for certain operations, right?
That's probably not a good long-term state.
##########
datafusion/datasource-arrow/src/file_format.rs:
##########
@@ -344,40 +382,68 @@ impl DataSink for ArrowFileSink {
}
}
+// Custom implementation of inferring schema. Should eventually be moved
upstream to arrow-rs.
+// See <https://github.com/apache/arrow-rs/issues/5021>
+
const ARROW_MAGIC: [u8; 6] = [b'A', b'R', b'R', b'O', b'W', b'1'];
const CONTINUATION_MARKER: [u8; 4] = [0xff; 4];
-/// Custom implementation of inferring schema. Should eventually be moved
upstream to arrow-rs.
-/// See <https://github.com/apache/arrow-rs/issues/5021>
-async fn infer_schema_from_file_stream(
+async fn infer_ipc_schema(
mut stream: BoxStream<'static, object_store::Result<Bytes>>,
) -> Result<SchemaRef> {
- // Expected format:
- // <magic number "ARROW1"> - 6 bytes
- // <empty padding bytes [to 8 byte boundary]> - 2 bytes
- // <continuation: 0xFFFFFFFF> - 4 bytes, not present below v0.15.0
- // <metadata_size: int32> - 4 bytes
- // <metadata_flatbuffer: bytes>
- // <rest of file bytes>
-
- // So in first read we need at least all known sized sections,
- // which is 6 + 2 + 4 + 4 = 16 bytes.
+ // Expected IPC format is either:
+ //
+ // stream:
+ // <continuation: 0xFFFFFFFF> - 4 bytes (added in v0.15.0+)
+ // <metadata_size: int32> - 4 bytes
+ // <metadata_flatbuffer: bytes>
+ // <rest of file bytes>
+ //
+ // file:
+ // <magic number "ARROW1"> - 6 bytes
+ // <empty padding bytes [to 8 byte boundary]> - 2 bytes
+ // <stream format above>
+
+ // Perform the initial read such that we always have the metadata size
let bytes = collect_at_least_n_bytes(&mut stream, 16, None).await?;
- // Files should start with these magic bytes
- if bytes[0..6] != ARROW_MAGIC {
- return Err(ArrowError::ParseError(
- "Arrow file does not contain correct header".to_string(),
- ))?;
- }
-
- // Since continuation marker bytes added in later versions
- let (meta_len, rest_of_bytes_start_index) = if bytes[8..12] ==
CONTINUATION_MARKER {
- (&bytes[12..16], 16)
+ // The preamble size is everything before the metadata size
+ let preamble_size = if bytes[0..6] == ARROW_MAGIC {
+ // File format starts with magic number "ARROW1"
+ if bytes[8..12] == CONTINUATION_MARKER {
+ // Continuation marker was added in v0.15.0
+ 12
+ } else {
+ // File format before v0.15.0
+ 8
+ }
+ } else if bytes[0..4] == CONTINUATION_MARKER {
+ // Stream format after v0.15.0 starts with continuation marker
+ 4
} else {
- (&bytes[8..12], 12)
+ // Stream format before v0.15.0 does not have a preamble
+ 0
};
+ infer_ipc_schema_ignoring_preamble_bytes(bytes, preamble_size,
stream).await
+}
+
+async fn infer_ipc_schema_ignoring_preamble_bytes(
+ bytes: Vec<u8>,
+ preamble_size: usize,
+ mut stream: BoxStream<'static, object_store::Result<Bytes>>,
+) -> Result<SchemaRef> {
+ let (meta_len, rest_of_bytes_start_index): ([u8; 4], usize) = (
+ bytes[preamble_size..preamble_size + 4]
+ .try_into()
+ .map_err(|err| {
+ ArrowError::ParseError(format!(
+ "Unable to read IPC message as metadata length: {err:?}"
+ ))
+ })?,
+ preamble_size + 4,
+ );
Review Comment:
Am I reading this right that rest_of_bytes_start_index is always just
preamble_size + 4?
If that's the case, it may be clearer to do two separate assignments, i,.e.
```suggestion
let meta_len: [u8; 4] = bytes[preamble_size..preamble_size + 4]
.try_into()
.map_err(|err| {
ArrowError::ParseError(format!(
"Unable to read IPC message as metadata length: {err:?}"
))
})?,
let rest_of_bytes_start_index: usize = preamble_size + 4;
```
##########
datafusion/datasource-arrow/src/file_format.rs:
##########
@@ -427,7 +493,8 @@ async fn collect_at_least_n_bytes(
if buf.len() < n {
return Err(ArrowError::ParseError(
"Unexpected end of byte stream for Arrow IPC file".to_string(),
- ))?;
+ )
+ .into());
Review Comment:
Why?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]