Hello Matthew, If you want to try to get absolutely the best performance you can get now from DataFusion:
* Make sure you are using the latest version from master, there have been a lot of improvements lately. * Compile DataFusion with "simd" feature on. This requires a recent version of DataFusion, but it gives speeds for some computations. * Compile your code with lto = true like this in your Cargo.toml file: [profile.release] lto = true This will increase the compile time considerably, but allows Rust / LLVM to do more optimizations on the entire program. There are some other settings documented here https://doc.rust-lang.org/cargo/reference/profiles.html#release * Set the environment variable RUSTFLAGS="-C target-cpu=native". this allows Rust/LLVM to use all CPU instructions available on your CPU. This way the binary becomes not portable anymore though. We are also improving the performance over time, e.g. recently a lot parts in Arrow / DataFusion have been improved in the last months such as faster CSV reader and faster computations, and there is still a lot to come. Best, Daniël Op za 12 dec. 2020 om 05:06 schreef Jorge Cardoso Leitão < [email protected]>: > Hi Mattew, > > SchemaRef is just an alias for Arc<Schema>. Thus, you need to wrap it on > an Arc. > > We do this because the plans are often passed between thread boundaries > and thus wrapping them on an Arc allows that. > > Best, > Jorge > > > On Fri, Dec 11, 2020 at 8:14 PM Matthew Turner < > [email protected]> wrote: > >> Thanks! Converting the schema to owned made it work. >> >> >> >> The type of the schema param is SchemaRef – which I thought would allow a >> reference. Is this not the case? >> >> >> >> *Matthew M. Turner* >> >> Email*:* [email protected] >> >> Phone: (908)-868-2786 >> >> >> >> *From:* Andy Grove <[email protected]> >> *Sent:* Friday, December 11, 2020 10:16 AM >> *To:* [email protected] >> *Subject:* Re: [Rust] DataFusion performance >> >> >> >> Hi Matthew, >> >> >> >> Using the latest DataFusion from GitHub master branch, the following code >> works for in-memory: >> >> use std::sync::Arc; >> use std::time::Instant; >> >> use datafusion::error::Result; >> use datafusion::prelude::*; >> use datafusion::datasource::MemTable; >> >> #[tokio::main] >> async fn main() -> Result<()> { >> // >> *TODO add command-line args *let ratings_csv = >> "/tmp/movies/ratings_small.csv"; >> let mut ctx = ExecutionContext::*new*(); >> let df = ctx.read_csv(ratings_csv, CsvReadOptions::*new*()).unwrap(); >> let batches = vec![df.collect().await?]; >> let provider = >> MemTable::*new*(Arc::*new*(df.schema().to_owned().into()), batches)?; >> ctx.register_table("memory_table", Box::*new*(provider)); >> let mem_df = ctx.table("memory_table")?; >> let q_start = Instant::*now*(); >> let _results = mem_df >> .filter(col("userId").eq(lit(1)))? >> .collect() >> .await >> .unwrap(); >> println!("Duration: {:?}", q_start.elapsed()); >> *Ok*(()) >> } >> >> >> >> Andy. >> >> >> >> On Fri, Dec 11, 2020 at 7:59 AM Matthew Turner < >> [email protected]> wrote: >> >> Played around some more - it was because I wasn’t using --release flag. >> Sry about that, still learning rust. >> >> Using that flag, the total time to read and filter is between 52 and 80ms. >> >> In general, what should I expect when comparing the performance of pandas >> to datafusion? >> >> @Andy Grove thanks for adding that. If there is a need for additional >> datafusion benchmarks and what I do could help with that then I would be >> happy to contribute it. I will send a follow up once ive made progress. >> >> I'm also still having trouble with that memory table, so any help there >> is appreciated. >> >> Thanks for your time! Very excited by this. >> >> Matthew M. Turner >> Email: [email protected] >> Phone: (908)-868-2786 >> >> -----Original Message----- >> From: Matthew Turner <[email protected]> >> Sent: Friday, December 11, 2020 12:24 AM >> To: [email protected] >> Subject: RE: [Rust] DataFusion performance >> >> Thanks for context! Makes sense. >> >> Even with that, when comparing the total time of each (read + filter) >> DataFusion still appears much slower(~625ms vs 33ms). Is that expected? >> >> Also, im trying to bring the table in memory now to perform the >> computation from there and compare performance. Code below. But I'm >> getting an error (beneath the code) even though I think ive constructed the >> MemTable correctly (from [1]). From what I see all the types are the same >> as when I used the original df from read_csv so I'm not sure what I'm doing >> wrong. >> >> I also saw there was an open issue [2] for this error type raised on >> rust-lang - so im unsure if its my implementation, datafusion/arrow issue, >> or Rust issue. >> >> Thanks again for help! >> >> ``` >> let sch = Arc::new(df.schema()); >> let batches = vec![df.collect().await?]; >> let provider = MemTable::new(sch, batches)?; >> >> ctx.register_table("memory_table", Box::new(provider)); >> >> let mem_df = ctx.table("memory_table")?; >> >> let q_start = Instant::now(); >> let results = mem_df >> .filter(col("userId").eq(lit(1)))? >> .collect() >> .await >> .unwrap(); >> ``` >> >> Which is returning this error: >> >> error[E0698]: type inside `async` block must be known in this context >> --> src\main.rs:37:38 >> | >> 37 | .filter(col("userId").eq(lit(1)))? >> | ^ cannot infer type for type >> `{integer}` >> | >> note: the type is part of the `async` block because of this `await` >> --> src\main.rs:36:19 >> | >> 36 | let results = mem_df >> | ___________________^ >> 37 | | .filter(col("userId").eq(lit(1)))? >> 38 | | .collect() >> 39 | | .await >> | |______________^ >> >> >> [1] >> https://github.com/apache/arrow/blob/master/rust/datafusion/examples/dataframe_in_memory.rs >> [2] https://github.com/rust-lang/rust/issues/63502 >> >> Matthew M. Turner >> Email: [email protected] >> Phone: (908)-868-2786 >> >> -----Original Message----- >> From: Michael Mior <[email protected]> >> Sent: Thursday, December 10, 2020 8:55 PM >> To: [email protected] >> Subject: Re: [Rust] DataFusion performance >> >> Contrary to what you might expect given the name, read_csv does not >> actually read the CSV file. It instead creates the start of a logical >> execution plan which involves reading the CSV file when that plan is >> finally executed. This happens when you call collect(). >> >> Pandas read_csv on the other hand immediately reads the CSV file. So >> you're comparing the time of reading AND filtering the file >> (DataFusion) with the time to filter data which has already been read >> (Pandas). >> >> There's nothing wrong with your use of DataFusion per se, you simply >> weren't measuring what you thought you were measuring. >> -- >> Michael Mior >> [email protected] >> >> Le jeu. 10 déc. 2020 à 17:11, Matthew Turner < >> [email protected]> a écrit : >> > >> > Hello, >> > >> > >> > >> > I’ve been playing around with DataFusion to explore the feasibility of >> replacing current python/pandas data processing jobs with Rust/datafusion. >> Ultimately, looking to improve performance / decrease cost. >> > >> > >> > >> > I was doing some simple tests to start to measure performance >> differences on a simple task (read a csv[1] and filter it). >> > >> > >> > >> > Reading the csv datafusion seemed to outperform pandas by around 30% >> which was nice. >> > >> > *Rust took around 20-25ms to read the csv (compared to 32ms from >> > pandas) >> > >> > >> > >> > However, when filtering the data I was surprised to see that pandas was >> way faster. >> > >> > *Rust took around 500-600ms to filter the csv(compared to 1ms from >> > pandas) >> > >> > >> > >> > My code for each is below. I know I should be running the DataFusion >> times through something similar to pythons %timeit but I didn’t have that >> immediately accessible and I ran many times to confirm it was roughly >> consistent. >> > >> > >> > >> > Is this performance expected? Or am I using datafusion incorrectly? >> > >> > >> > >> > Any insight is much appreciated! >> > >> > >> > >> > [Rust] >> > >> > ``` >> > >> > use datafusion::error::Result; >> > >> > use datafusion::prelude::*; >> > >> > use std::time::Instant; >> > >> > >> > >> > #[tokio::main] >> > >> > async fn main() -> Result<()> { >> > >> > let start = Instant::now(); >> > >> > >> > >> > let mut ctx = ExecutionContext::new(); >> > >> > >> > >> > let ratings_csv = "ratings_small.csv"; >> > >> > >> > >> > let df = ctx.read_csv(ratings_csv, >> > CsvReadOptions::new()).unwrap(); >> > >> > println!("Read CSV Duration: {:?}", start.elapsed()); >> > >> > >> > >> > let q_start = Instant::now(); >> > >> > let results = df >> > >> > .filter(col("userId").eq(lit(1)))? >> > >> > .collect() >> > >> > .await >> > >> > .unwrap(); >> > >> > println!("Filter duration: {:?}", q_start.elapsed()); >> > >> > >> > >> > println!("Duration: {:?}", start.elapsed()); >> > >> > >> > >> > Ok(()) >> > >> > } >> > >> > ``` >> > >> > >> > >> > [Python] >> > >> > ``` >> > >> > In [1]: df = pd.read_csv(“ratings_small.csv”) >> > >> > 32.4 ms ± 210 µs per loop (mean ± std. dev. of 7 runs, 10 loops each) >> > >> > >> > >> > In [2]: df.query(“userId==1”) >> > >> > 1.16 ms ± 24.5 µs per loop (mean ± std. dev. of 7 runs, 1000 loops >> > each) >> > >> > ``` >> > >> > >> > >> > [1]: >> > https://www.kaggle.com/rounakbanik/the-movies-dataset?select=ratings.c >> > sv >> > >> > >> > >> > >> > >> > Matthew M. Turner >> > >> > Email: [email protected] >> > >> > Phone: (908)-868-2786 >> > >> > >> >> -- Daniël Heres
