Jonathan Keane created ARROW-16100: -------------------------------------- Summary: [C++] TPC-H generator cleanups Key: ARROW-16100 URL: https://issues.apache.org/jira/browse/ARROW-16100 Project: Apache Arrow Issue Type: Bug Components: C++ Reporter: Jonathan Keane
An umbrella issue for a number of issues I've run into with our TPC-H generator. h2. We emit fixed_size_binary fields with nuls padding the strings. Ideally we would either emit these as utf8 strings like the others, or we would have a toggle to emit them as such (though see below about needing to strip nuls) When I try and run these through the I get a number of seg faults or hangs when running a number of the TPC-H queries. Additionally, even converting these to utf8|string types, I also need to strip out the nuls in order to actually query against them: {code} library(arrow, warn.conflicts = FALSE) #> See arrow_info() for available features library(dplyr, warn.conflicts = FALSE) options(arrow.skip_nul = TRUE) tab <- read_parquet("data_arrow_raw/nation_1.parquet", as_data_frame = FALSE) tab #> Table #> 25 rows x 4 columns #> $N_NATIONKEY <int32> #> $N_NAME <fixed_size_binary[25]> #> $N_REGIONKEY <int32> #> $N_COMMENT <string> # This will not work (Though is how the TPC-H queries are structured) tab %>% filter(N_NAME == "JAPAN") %>% collect() #> # A tibble: 0 × 4 #> # … with 4 variables: N_NATIONKEY <int>, N_NAME <fixed_size_binary<25>>, #> # N_REGIONKEY <int>, N_COMMENT <chr> # Instead, we need to create the nul padded string to do the comparison japan_raw <- as.raw( c(0x4a, 0x41, 0x50, 0x41, 0x4e, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00) ) # confirming this is the same thing as in the data japan_raw == as.vector(tab$N_NAME)[[13]] #> [1] TRUE TRUE TRUE TRUE TRUE TRUE TRUE TRUE TRUE TRUE TRUE TRUE TRUE TRUE TRUE #> [16] TRUE TRUE TRUE TRUE TRUE TRUE TRUE TRUE TRUE TRUE tab %>% filter(N_NAME == Scalar$create(japan_raw, type = fixed_size_binary(25))) %>% collect() #> # A tibble: 1 × 4 #> N_NATIONKEY #> <int> #> 1 12 #> # … with 3 more variables: N_NAME <fixed_size_binary<25>>, N_REGIONKEY <int>, #> # N_COMMENT <chr> {code} Here is the code I've been using to cast + strip these out after the fact: {code} library(arrow, warn.conflicts = FALSE) options(arrow.skip_nul = TRUE) options(arrow.use_altrep = FALSE) tables <- arrowbench:::tpch_tables for (table_name in tables) { message("Working on ", table_name) tab <- read_parquet(glue::glue("./data_arrow_raw/{table_name}_1.parquet"), as_data_frame=FALSE) for (col in tab$schema$fields) { if (inherits(col$type, "FixedSizeBinary")) { message("Rewritting ", col$name) tab[[col$name]] <- Array$create(as.vector(tab[[col$name]]$cast(string()))) } } tab <- write_parquet(tab, glue::glue("./data/{table_name}_1.parquet")) } {code} h2. The data does not produce correct answers When checking these against the known-good answers for scale factor 1, all of the queries are off (two are close enough that they get past arrowbench's pretty loose validation). One example: {code} library(arrowbench) library(dplyr, warn.conflicts=FALSE) options(arrow.skip_nul = TRUE) input_funcs <- get_input_func( engine = "arrow", scale_factor = 1, query_id = 1, format = "parquet" ) result <- get_query_func(1, "arrow")(input_funcs) ans <- tpch_answer(1, query_id = 1) waldo::compare(result, ans) #> old vs new #> sum_qty sum_base_price sum_disc_price sum_charge avg_qty avg_price count_order #> - old[1, ] 37645216 56445778153 53624556113 55767898658 25.48000 38201.84 1477567 #> + new[1, ] 37734107 56586554401 53758257135 55909065223 25.52201 38273.13 1478493 #> - old[2, ] 992235 1486295723 1411376726 1467883176 25.48000 38172.79 38936 #> + new[2, ] 991417 1487504710 1413082168 1469649223 25.51647 38284.47 38854 #> - old[3, ] 74391871 111564378395 105981512916 110221376016 25.51000 38254.33 2916386 #> + new[3, ] 74476040 111701729698 106118230308 110367043872 25.50223 38249.12 2920374 #> - old[4, ] 37717055 56550644913 53721555527 55872979092 25.50000 38236.53 1478969 #> + new[4, ] 37719753 56568041381 53741292685 55889619120 25.50579 38250.85 1478870 #> #> `old$sum_qty` is a double vector (37645216, 992235, 74391871, 37717055) #> `new$sum_qty` is an integer vector (37734107, 991417, 74476040, 37719753) #> #> `old$sum_base_price`: 56445778153 1486295723 111564378395 56550644913 #> `new$sum_base_price`: 56586554401 1487504710 111701729698 56568041381 #> #> `old$sum_disc_price`: 53624556113 1411376726 105981512916 53721555527 #> `new$sum_disc_price`: 53758257135 1413082168 106118230308 53741292685 #> #> `old$sum_charge`: 55767898658 1467883176 110221376016 55872979092 #> `new$sum_charge`: 55909065223 1469649223 110367043872 55889619120 #> #> `old$avg_qty`: 25.480 25.480 25.510 25.500 #> `new$avg_qty`: 25.522 25.516 25.502 25.506 #> #> `old$avg_price`: 38202 38173 38254 38237 #> `new$avg_price`: 38273 38284 38249 38251 #> #> `old$count_order`: 1477567 38936 2916386 1478969 #> `new$count_order`: 1478493 38854 2920374 1478870 {code} Note, you can generate this tpc-h data from https://github.com/apache/arrow/pull/12769 with the following. This shuffling is needed because we can only write datasets from execnodes (without materializing into memory entirely), so we move the files around as if they were single file writes: {code} path <- tpch_dbgen_write(1, "some/path") from_dataset_to_parquet <- function(path, scale_factor) { ds_files <- list.files(path, recursive = TRUE, full.names = TRUE) # we can only deal with single parquet files in each partition this way if (!all(grepl("data-0.parquet$", ds_files))) { stop("At least one partition has more than one file") } ds_files_to <- gsub( "/data-0.parquet", paste0("_", format(scale_factor, scientific = FALSE), ".parquet"), ds_files ) file.rename(ds_files, ds_files_to) # cleanup empty folders, this might be a bit aggressive folders_to_remove <- gsub("/data-0.parquet", "", ds_files) unlink(folders_to_remove, recursive = TRUE) } from_dataset_to_parquet(path, 1) {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)