2010YOUY01 commented on code in PR #18457:
URL: https://github.com/apache/datafusion/pull/18457#discussion_r2502618476
##########
datafusion/datasource-arrow/src/source.rs:
##########
@@ -116,13 +117,150 @@ impl FileSource for ArrowSource {
}
}
-/// The struct arrow that implements `[FileOpener]` trait
-pub struct ArrowOpener {
+/// Arrow IPC Stream format source - supports only sequential reading
+#[derive(Clone, Default)]
+pub struct ArrowStreamSource {
+ metrics: ExecutionPlanMetricsSet,
+ projected_statistics: Option<Statistics>,
+ schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
+}
+
+impl From<ArrowStreamSource> for Arc<dyn FileSource> {
+ fn from(source: ArrowStreamSource) -> Self {
+ as_file_source(source)
+ }
+}
+
+impl FileSource for ArrowStreamSource {
+ fn create_file_opener(
+ &self,
+ object_store: Arc<dyn ObjectStore>,
+ base_config: &FileScanConfig,
+ _partition: usize,
+ ) -> Arc<dyn FileOpener> {
+ Arc::new(ArrowStreamOpener {
+ object_store,
+ projection: base_config.file_column_projection_indices(),
+ })
+ }
+
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn with_batch_size(&self, _batch_size: usize) -> Arc<dyn FileSource> {
+ Arc::new(Self { ..self.clone() })
+ }
+
+ fn with_schema(&self, _schema: TableSchema) -> Arc<dyn FileSource> {
+ Arc::new(Self { ..self.clone() })
+ }
+
+ fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {
+ let mut conf = self.clone();
+ conf.projected_statistics = Some(statistics);
+ Arc::new(conf)
+ }
+
+ fn with_projection(&self, _config: &FileScanConfig) -> Arc<dyn FileSource>
{
+ Arc::new(Self { ..self.clone() })
+ }
+
+ fn repartitioned(
+ &self,
+ _target_partitions: usize,
+ _repartition_file_min_size: usize,
+ _output_ordering:
Option<datafusion_physical_expr_common::sort_expr::LexOrdering>,
+ _config: &FileScanConfig,
+ ) -> Result<Option<FileScanConfig>> {
+ // Stream format doesn't support range-based parallel reading
+ // because it lacks a footer that would be needed to make range-based
+ // seeking practical. Without that, you would either need to read
+ // the entire file and index it up front before doing parallel reading
+ // or else each partition would need to read the entire file up to the
+ // correct offset which is a lot of duplicate I/O. We're opting to
avoid
+ // that entirely by only acting on a single partition and reading
sequentially.
+ Ok(None)
+ }
Review Comment:
I think partitioning is doable, but it's better be done afterwards if anyone
has a real use case.
In order to do repartition, this function has to scan once, record the
dictionary and batch positions, then split the work evenly to parallel
partitioned workers -- this task's can be done at around full disk bandwidth
speed (5GB/Sec on recent MacBooks)
Regarding decoding the batches from Arrow IPC Stream file to in-memory arrow
`RecordBatches`, if dictionary encoding and some heavy weigh compression like
zstd is applied, the bandwidth can be way lower (several hundred MB/S)
So it's still worth a whole scan up front to make the whole processing
faster with partitioning, though I don't known if it's a common requirement to
query large IPC Stream file.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]