[ 
https://issues.apache.org/jira/browse/ARROW-15069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17457394#comment-17457394
 ] 

Andy Teucher commented on ARROW-15069:
--------------------------------------

Fantastic, thank you! I hesitated to call it a bug, since the code does work, 
just not as fast as I expected.

> [R] open_dataset very slow on heavily partitioned parquet dataset
> -----------------------------------------------------------------
>
>                 Key: ARROW-15069
>                 URL: https://issues.apache.org/jira/browse/ARROW-15069
>             Project: Apache Arrow
>          Issue Type: Bug
>          Components: R
>    Affects Versions: 6.0.1
>         Environment: macOS Mojave, R 4.1.1 
>            Reporter: Andy Teucher
>            Priority: Minor
>
> Opening a (particular) partitioned hive-style parquet dataset is very slow 
> (45s to 1 minute).  I have a reproducible example below that takes 780 csv 
> files and writes them to parquet using the {{open_dataset("csv files") |> 
> group_by(vars) |> write_dataset("parquet")}} suggested 
> [here|https://arrow.apache.org/docs/r/articles/dataset.html#writing-datasets].
>  Opening and querying the subsequent parquet dataset is much slower than 
> doing it on the original csv files, which is not what I expected.
> {code:java}
> library(arrow)
> library(dplyr)
> library(tictoc)
> zipfile <- "ahccd.zip"
> csv_dir <- "data/csv"
> parquet_dir <- "data/parquet"
> dir.create(csv_dir, recursive = TRUE)
> dir.create(parquet_dir, recursive = TRUE)
> # A zip of 780 csvs of daily temperature data at Canadian climate stations 
> (one file per station)
> download.file("https://www.dropbox.com/s/f0a18jp0lvbp1hp/ahccd.zip?dl=1";, 
> destfile = zipfile)
> unzip(zipfile, exdir = csv_dir)
> csv_schema <- schema(
>   field("stn_id", string()),
>   field("stn_name", string()),
>   field("measure", string()),
>   field("date", date32()),
>   field("year", int64()),
>   field("month", int64()),
>   field("temp", double()),
>   field("province", string()),
>   field("stn_joined", string()),
>   field("element", string()),
>   field("unit", string()),
>   field("stn_last_updated", string()),
>   field("flag", string())
> )
> csv_format <- FileFormat$create(format = "csv", quoting = FALSE)
> # Write to parquet, partitioning on stn_id, year, measure
> tic("write csv to parquet")
> arrow::open_dataset("data/csv", schema = csv_schema,
>                     format = csv_format) |>
>   group_by(stn_id, year, measure) |>
>   write_dataset(parquet_dir, format = "parquet")
> toc()
> #> write csv to parquet: 2067.093 sec elapsed
> stations <- c("1100031", "1100120", "1100119", "1036B06")
> ## Query directory of original csv files
> tic("query csv")
> foo <- arrow::open_dataset(csv_dir, schema = csv_schema,
>                            format = csv_format) |>
>   filter(year >= 1990,
>          year <= 2020,
>          stn_id %in% stations,
>          measure == "daily_max") |>
>   collect()
> toc()
> #> query csv: 12.571 sec elapsed
> ## Query the hive-style parquet directory
> tic("query parquet")
> bar <- arrow::open_dataset("data/parquet") |>
>   filter(year >= 1990,
>          year <= 2020,
>          stn_id %in% stations,
>          measure == "daily_max") |>
>   collect()
> toc()
> #> query parquet: 41.79 sec elapsed
> ## It turns out that it is just the opening of the dataset 
> ## that takes so long
> tic("open parquet dataset")
> ds <- arrow::open_dataset("~/Desktop/arrow-report/data/parquet")
> toc()
> #> open parquet dataset: 45.581 sec elapsed
> ds
> #> FileSystemDataset with 191171 Parquet files
> #> stn_name: string
> #> date: date32[day]
> #> month: int64
> #> temp: double
> #> province: string
> #> stn_joined: string
> #> element: string
> #> unit: string
> #> stn_last_updated: string
> #> flag: string
> #> stn_id: string
> #> year: int32
> #> measure: string
> tic("query already openend dataset")
> ds |> 
>   filter(year >= 1990,
>          year <= 2020,
>          stn_id %in% stations,
>          measure == "daily_max") |>
>   collect()
> #> # A tibble: 44,469 × 13
> #>    stn_name date       month  temp province stn_joined     element        
> unit  
> #>    <chr>    <date>     <int> <dbl> <chr>    <chr>          <chr>          
> <chr> 
> #>  1 ALBERNI  1992-01-01     1   6.5 BC       station joined Homogenized d… 
> Deg C…
> #>  2 ALBERNI  1992-01-02     1   5.5 BC       station joined Homogenized d… 
> Deg C…
> #>  3 ALBERNI  1992-01-03     1   3.5 BC       station joined Homogenized d… 
> Deg C…
> #>  4 ALBERNI  1992-01-04     1   6   BC       station joined Homogenized d… 
> Deg C…
> #>  5 ALBERNI  1992-01-05     1   0.5 BC       station joined Homogenized d… 
> Deg C…
> #>  6 ALBERNI  1992-01-06     1   0   BC       station joined Homogenized d… 
> Deg C…
> #>  7 ALBERNI  1992-01-07     1   0   BC       station joined Homogenized d… 
> Deg C…
> #>  8 ALBERNI  1992-01-08     1   1.5 BC       station joined Homogenized d… 
> Deg C…
> #>  9 ALBERNI  1992-01-09     1   4   BC       station joined Homogenized d… 
> Deg C…
> #> 10 ALBERNI  1992-01-10     1   5.5 BC       station joined Homogenized d… 
> Deg C…
> #> # … with 44,459 more rows, and 5 more variables: stn_last_updated <chr>,
> #> #   flag <chr>, stn_id <chr>, year <int>, measure <chr>
> toc()
> #> query already openend dataset: 0.356 sec elapsed
> {code}
> The above reprex is self-contained, but will take a while to run, 
> specifically the writing of the parquet dataset can take up to 30 min. It 
> also downloads a 380MB zip file of csvs from my Dropbox.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to