jorgecarleitao commented on a change in pull request #8482:
URL: https://github.com/apache/arrow/pull/8482#discussion_r507178148



##########
File path: rust/arrow/src/csv/reader.rs
##########
@@ -304,162 +311,180 @@ impl<R: Read> Reader<R> {
 
         let csv_reader = reader_builder.from_reader(buf_reader);
         let record_iter = csv_reader.into_records();
+
+        let (start, end) = match bounds {
+            None => (0, usize::MAX),
+            Some((start, end)) => (start, end),
+        };
+        // Create an iterator that:
+        // * skips the first `start` items
+        // * runs up to `end` items
+        // * buffers `batch_size` items
+        // note that this skips by iteration. This is because in general it is 
not possible
+        // to seek in CSV. However, skiping still saves the burden of creating 
arrow arrays,
+        // which is a slow operation that scales with the number of columns
+        let record_iter = Buffered::new(record_iter.skip(start).take(end), 
batch_size);
+
         Self {
             schema,
             projection,
             record_iter,
-            batch_size,
-            line_number: if has_header { 1 } else { 0 },
+            line_number: if has_header { start + 1 } else { start + 0 },
         }
     }
-
-    fn parse(&self, rows: &[StringRecord]) -> Result<RecordBatch> {
-        let projection: Vec<usize> = match self.projection {
-            Some(ref v) => v.clone(),
-            None => self
-                .schema
-                .fields()
-                .iter()
-                .enumerate()
-                .map(|(i, _)| i)
-                .collect(),
-        };
-
-        let arrays: Result<Vec<ArrayRef>> = projection
-            .iter()
-            .map(|i| {
-                let i = *i;
-                let field = self.schema.field(i);
-                match field.data_type() {
-                    &DataType::Boolean => {
-                        self.build_primitive_array::<BooleanType>(rows, i)
-                    }
-                    &DataType::Int8 => 
self.build_primitive_array::<Int8Type>(rows, i),
-                    &DataType::Int16 => 
self.build_primitive_array::<Int16Type>(rows, i),
-                    &DataType::Int32 => 
self.build_primitive_array::<Int32Type>(rows, i),
-                    &DataType::Int64 => 
self.build_primitive_array::<Int64Type>(rows, i),
-                    &DataType::UInt8 => 
self.build_primitive_array::<UInt8Type>(rows, i),
-                    &DataType::UInt16 => {
-                        self.build_primitive_array::<UInt16Type>(rows, i)
-                    }
-                    &DataType::UInt32 => {
-                        self.build_primitive_array::<UInt32Type>(rows, i)
-                    }
-                    &DataType::UInt64 => {
-                        self.build_primitive_array::<UInt64Type>(rows, i)
-                    }
-                    &DataType::Float32 => {
-                        self.build_primitive_array::<Float32Type>(rows, i)
-                    }
-                    &DataType::Float64 => {
-                        self.build_primitive_array::<Float64Type>(rows, i)
-                    }
-                    &DataType::Utf8 => {
-                        let mut builder = StringBuilder::new(rows.len());
-                        for row in rows.iter() {
-                            match row.get(i) {
-                                Some(s) => builder.append_value(s).unwrap(),
-                                _ => builder.append(false).unwrap(),
-                            }
-                        }
-                        Ok(Arc::new(builder.finish()) as ArrayRef)
-                    }
-                    other => Err(ArrowError::ParseError(format!(
-                        "Unsupported data type {:?}",
-                        other
-                    ))),
-                }
-            })
-            .collect();
-
-        let schema_fields = self.schema.fields();
-
-        let projected_fields: Vec<Field> = projection
-            .iter()
-            .map(|i| schema_fields[*i].clone())
-            .collect();
-
-        let projected_schema = Arc::new(Schema::new(projected_fields));
-
-        arrays.and_then(|arr| RecordBatch::try_new(projected_schema, arr))
-    }
-
-    fn build_primitive_array<T: ArrowPrimitiveType>(
-        &self,
-        rows: &[StringRecord],
-        col_idx: usize,
-    ) -> Result<ArrayRef> {
-        let is_boolean_type =
-            *self.schema.field(col_idx).data_type() == DataType::Boolean;
-
-        rows.iter()
-            .enumerate()
-            .map(|(row_index, row)| {
-                match row.get(col_idx) {
-                    Some(s) => {
-                        if s.is_empty() {
-                            return Ok(None);
-                        }
-                        let parsed = if is_boolean_type {
-                            s.to_lowercase().parse::<T::Native>()
-                        } else {
-                            s.parse::<T::Native>()
-                        };
-                        match parsed {
-                            Ok(e) => Ok(Some(e)),
-                            Err(_) => Err(ArrowError::ParseError(format!(
-                                // TODO: we should surface the underlying 
error here.
-                                "Error while parsing value {} for column {} at 
line {}",
-                                s,
-                                col_idx,
-                                self.line_number + row_index
-                            ))),
-                        }
-                    }
-                    None => Ok(None),
-                }
-            })
-            .collect::<Result<PrimitiveArray<T>>>()
-            .map(|e| Arc::new(e) as ArrayRef)
-    }
 }
 
 impl<R: Read> Iterator for Reader<R> {
     type Item = Result<RecordBatch>;
 
     fn next(&mut self) -> Option<Self::Item> {
-        // read a batch of rows into memory
-        let mut rows: Vec<StringRecord> = Vec::with_capacity(self.batch_size);
-        for i in 0..self.batch_size {
-            match self.record_iter.next() {
-                Some(Ok(r)) => {
-                    rows.push(r);
-                }
-                Some(Err(e)) => {
-                    return Some(Err(ArrowError::ParseError(format!(
-                        "Error parsing line {}: {:?}",
-                        self.line_number + i,
-                        e
-                    ))));
-                }
-                None => break,
+        let rows = match self.record_iter.next() {
+            Some(Ok(r)) => r,
+            Some(Err(e)) => {
+                return Some(Err(ArrowError::ParseError(format!(
+                    "Error parsing line {}: {:?}",
+                    self.line_number + self.record_iter.n(),
+                    e
+                ))));
             }
-        }
+            None => return None,
+        };
 
         // return early if no data was loaded
         if rows.is_empty() {
             return None;
         }
 
         // parse the batches into a RecordBatch
-        let result = self.parse(&rows);
+        let result = parse(
+            &rows,
+            &self.schema.fields(),
+            &self.projection,
+            self.line_number,
+        );
 
         self.line_number += rows.len();
 
         Some(result)
     }
 }
 
+/// parses a slice of [csv_crate::StringRecord] into a RecordBatch.
+pub fn parse(

Review comment:
       Good catch. I reverted this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to