[ https://issues.apache.org/jira/browse/ARROW-15069?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Weston Pace reassigned ARROW-15069: ----------------------------------- Assignee: Weston Pace > [R] open_dataset very slow on heavily partitioned parquet dataset > ----------------------------------------------------------------- > > Key: ARROW-15069 > URL: https://issues.apache.org/jira/browse/ARROW-15069 > Project: Apache Arrow > Issue Type: Bug > Components: R > Affects Versions: 6.0.1 > Environment: macOS Mojave, R 4.1.1 > Reporter: Andy Teucher > Assignee: Weston Pace > Priority: Minor > > Opening a (particular) partitioned hive-style parquet dataset is very slow > (45s to 1 minute). I have a reproducible example below that takes 780 csv > files and writes them to parquet using the {{open_dataset("csv files") |> > group_by(vars) |> write_dataset("parquet")}} suggested > [here|https://arrow.apache.org/docs/r/articles/dataset.html#writing-datasets]. > Opening and querying the subsequent parquet dataset is much slower than > doing it on the original csv files, which is not what I expected. > {code:java} > library(arrow) > library(dplyr) > library(tictoc) > zipfile <- "ahccd.zip" > csv_dir <- "data/csv" > parquet_dir <- "data/parquet" > dir.create(csv_dir, recursive = TRUE) > dir.create(parquet_dir, recursive = TRUE) > # A zip of 780 csvs of daily temperature data at Canadian climate stations > (one file per station) > download.file("https://www.dropbox.com/s/f0a18jp0lvbp1hp/ahccd.zip?dl=1", > destfile = zipfile) > unzip(zipfile, exdir = csv_dir) > csv_schema <- schema( > field("stn_id", string()), > field("stn_name", string()), > field("measure", string()), > field("date", date32()), > field("year", int64()), > field("month", int64()), > field("temp", double()), > field("province", string()), > field("stn_joined", string()), > field("element", string()), > field("unit", string()), > field("stn_last_updated", string()), > field("flag", string()) > ) > csv_format <- FileFormat$create(format = "csv", quoting = FALSE) > # Write to parquet, partitioning on stn_id, year, measure > tic("write csv to parquet") > arrow::open_dataset("data/csv", schema = csv_schema, > format = csv_format) |> > group_by(stn_id, year, measure) |> > write_dataset(parquet_dir, format = "parquet") > toc() > #> write csv to parquet: 2067.093 sec elapsed > stations <- c("1100031", "1100120", "1100119", "1036B06") > ## Query directory of original csv files > tic("query csv") > foo <- arrow::open_dataset(csv_dir, schema = csv_schema, > format = csv_format) |> > filter(year >= 1990, > year <= 2020, > stn_id %in% stations, > measure == "daily_max") |> > collect() > toc() > #> query csv: 12.571 sec elapsed > ## Query the hive-style parquet directory > tic("query parquet") > bar <- arrow::open_dataset("data/parquet") |> > filter(year >= 1990, > year <= 2020, > stn_id %in% stations, > measure == "daily_max") |> > collect() > toc() > #> query parquet: 41.79 sec elapsed > ## It turns out that it is just the opening of the dataset > ## that takes so long > tic("open parquet dataset") > ds <- arrow::open_dataset("~/Desktop/arrow-report/data/parquet") > toc() > #> open parquet dataset: 45.581 sec elapsed > ds > #> FileSystemDataset with 191171 Parquet files > #> stn_name: string > #> date: date32[day] > #> month: int64 > #> temp: double > #> province: string > #> stn_joined: string > #> element: string > #> unit: string > #> stn_last_updated: string > #> flag: string > #> stn_id: string > #> year: int32 > #> measure: string > tic("query already openend dataset") > ds |> > filter(year >= 1990, > year <= 2020, > stn_id %in% stations, > measure == "daily_max") |> > collect() > #> # A tibble: 44,469 × 13 > #> stn_name date month temp province stn_joined element > unit > #> <chr> <date> <int> <dbl> <chr> <chr> <chr> > <chr> > #> 1 ALBERNI 1992-01-01 1 6.5 BC station joined Homogenized d… > Deg C… > #> 2 ALBERNI 1992-01-02 1 5.5 BC station joined Homogenized d… > Deg C… > #> 3 ALBERNI 1992-01-03 1 3.5 BC station joined Homogenized d… > Deg C… > #> 4 ALBERNI 1992-01-04 1 6 BC station joined Homogenized d… > Deg C… > #> 5 ALBERNI 1992-01-05 1 0.5 BC station joined Homogenized d… > Deg C… > #> 6 ALBERNI 1992-01-06 1 0 BC station joined Homogenized d… > Deg C… > #> 7 ALBERNI 1992-01-07 1 0 BC station joined Homogenized d… > Deg C… > #> 8 ALBERNI 1992-01-08 1 1.5 BC station joined Homogenized d… > Deg C… > #> 9 ALBERNI 1992-01-09 1 4 BC station joined Homogenized d… > Deg C… > #> 10 ALBERNI 1992-01-10 1 5.5 BC station joined Homogenized d… > Deg C… > #> # … with 44,459 more rows, and 5 more variables: stn_last_updated <chr>, > #> # flag <chr>, stn_id <chr>, year <int>, measure <chr> > toc() > #> query already openend dataset: 0.356 sec elapsed > {code} > The above reprex is self-contained, but will take a while to run, > specifically the writing of the parquet dataset can take up to 30 min. It > also downloads a 380MB zip file of csvs from my Dropbox. -- This message was sent by Atlassian Jira (v8.20.1#820001)