Thanks for context! Makes sense.
Even with that, when comparing the total time of each (read + filter)
DataFusion still appears much slower(~625ms vs 33ms). Is that expected?
Also, im trying to bring the table in memory now to perform the computation
from there and compare performance. Code below. But I'm getting an error
(beneath the code) even though I think ive constructed the MemTable correctly
(from [1]). From what I see all the types are the same as when I used the
original df from read_csv so I'm not sure what I'm doing wrong.
I also saw there was an open issue [2] for this error type raised on rust-lang
- so im unsure if its my implementation, datafusion/arrow issue, or Rust issue.
Thanks again for help!
```
let sch = Arc::new(df.schema());
let batches = vec![df.collect().await?];
let provider = MemTable::new(sch, batches)?;
ctx.register_table("memory_table", Box::new(provider));
let mem_df = ctx.table("memory_table")?;
let q_start = Instant::now();
let results = mem_df
.filter(col("userId").eq(lit(1)))?
.collect()
.await
.unwrap();
```
Which is returning this error:
error[E0698]: type inside `async` block must be known in this context
--> src\main.rs:37:38
|
37 | .filter(col("userId").eq(lit(1)))?
| ^ cannot infer type for type
`{integer}`
|
note: the type is part of the `async` block because of this `await`
--> src\main.rs:36:19
|
36 | let results = mem_df
| ___________________^
37 | | .filter(col("userId").eq(lit(1)))?
38 | | .collect()
39 | | .await
| |______________^
[1]
https://github.com/apache/arrow/blob/master/rust/datafusion/examples/dataframe_in_memory.rs
[2] https://github.com/rust-lang/rust/issues/63502
Matthew M. Turner
Email: [email protected]
Phone: (908)-868-2786
-----Original Message-----
From: Michael Mior <[email protected]>
Sent: Thursday, December 10, 2020 8:55 PM
To: [email protected]
Subject: Re: [Rust] DataFusion performance
Contrary to what you might expect given the name, read_csv does not actually
read the CSV file. It instead creates the start of a logical execution plan
which involves reading the CSV file when that plan is finally executed. This
happens when you call collect().
Pandas read_csv on the other hand immediately reads the CSV file. So you're
comparing the time of reading AND filtering the file
(DataFusion) with the time to filter data which has already been read (Pandas).
There's nothing wrong with your use of DataFusion per se, you simply weren't
measuring what you thought you were measuring.
--
Michael Mior
[email protected]
Le jeu. 10 déc. 2020 à 17:11, Matthew Turner <[email protected]> a
écrit :
>
> Hello,
>
>
>
> I’ve been playing around with DataFusion to explore the feasibility of
> replacing current python/pandas data processing jobs with Rust/datafusion.
> Ultimately, looking to improve performance / decrease cost.
>
>
>
> I was doing some simple tests to start to measure performance differences on
> a simple task (read a csv[1] and filter it).
>
>
>
> Reading the csv datafusion seemed to outperform pandas by around 30% which
> was nice.
>
> *Rust took around 20-25ms to read the csv (compared to 32ms from
> pandas)
>
>
>
> However, when filtering the data I was surprised to see that pandas was way
> faster.
>
> *Rust took around 500-600ms to filter the csv(compared to 1ms from
> pandas)
>
>
>
> My code for each is below. I know I should be running the DataFusion times
> through something similar to pythons %timeit but I didn’t have that
> immediately accessible and I ran many times to confirm it was roughly
> consistent.
>
>
>
> Is this performance expected? Or am I using datafusion incorrectly?
>
>
>
> Any insight is much appreciated!
>
>
>
> [Rust]
>
> ```
>
> use datafusion::error::Result;
>
> use datafusion::prelude::*;
>
> use std::time::Instant;
>
>
>
> #[tokio::main]
>
> async fn main() -> Result<()> {
>
> let start = Instant::now();
>
>
>
> let mut ctx = ExecutionContext::new();
>
>
>
> let ratings_csv = "ratings_small.csv";
>
>
>
> let df = ctx.read_csv(ratings_csv,
> CsvReadOptions::new()).unwrap();
>
> println!("Read CSV Duration: {:?}", start.elapsed());
>
>
>
> let q_start = Instant::now();
>
> let results = df
>
> .filter(col("userId").eq(lit(1)))?
>
> .collect()
>
> .await
>
> .unwrap();
>
> println!("Filter duration: {:?}", q_start.elapsed());
>
>
>
> println!("Duration: {:?}", start.elapsed());
>
>
>
> Ok(())
>
> }
>
> ```
>
>
>
> [Python]
>
> ```
>
> In [1]: df = pd.read_csv(“ratings_small.csv”)
>
> 32.4 ms ± 210 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
>
>
>
> In [2]: df.query(“userId==1”)
>
> 1.16 ms ± 24.5 µs per loop (mean ± std. dev. of 7 runs, 1000 loops
> each)
>
> ```
>
>
>
> [1]:
> https://www.kaggle.com/rounakbanik/the-movies-dataset?select=ratings.c
> sv
>
>
>
>
>
> Matthew M. Turner
>
> Email: [email protected]
>
> Phone: (908)-868-2786
>
>