[jira] [Commented] (ARROW-16700) [C++] [R] [Datasets] aggregates on partitioning columns

2022-07-05 Thread Weston Pace (Jira)


[ 
https://issues.apache.org/jira/browse/ARROW-16700?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17562926#comment-17562926
 ] 

Weston Pace commented on ARROW-16700:
-

> but it doesn't feel right that Scanner makes more columns than ReadRel if 
> we're going to be treating them as if they map one-to-one to each other.

I agree there.  The ReadRel should be able to specify these columns and, if it 
doesn't, these columns should not be included.

> [C++] [R] [Datasets] aggregates on partitioning columns
> ---
>
> Key: ARROW-16700
> URL: https://issues.apache.org/jira/browse/ARROW-16700
> Project: Apache Arrow
>  Issue Type: Bug
>  Components: C++, R
>Reporter: Jonathan Keane
>Assignee: Jeroen van Straten
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 9.0.0, 8.0.1
>
>  Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> When summarizing a whole dataset (without group_by) with an aggregate, and 
> summarizing a partitioned column, arrow returns wrong data:
> {code:r}
> library(arrow, warn.conflicts = FALSE)
> library(dplyr, warn.conflicts = FALSE)
> df <- expand.grid(
>   some_nulls = c(0L, 1L, 2L),
>   year = 2010:2023,
>   month = 1:12,
>   day = 1:30
> )
> path <- tempfile()
> dir.create(path)
> write_dataset(df, path, partitioning = c("year", "month"))
> ds <- open_dataset(path)
> # with arrow the mins/maxes are off for partitioning columns
> ds %>%
>   summarise(n = n(), min_year = min(year), min_month = min(month), min_day = 
> min(day), max_year = max(year), max_month = max(month), max_day = max(day)) 
> %>% 
>   collect()
> #> # A tibble: 1 × 7
> #>   n min_year min_month min_day max_year max_month max_day
> #>   
> #> 1 15120 2023 1   1 202312  30
> # comapred to what we get with dplyr
> df %>%
>   summarise(n = n(), min_year = min(year), min_month = min(month), min_day = 
> min(day), max_year = max(year), max_month = max(month), max_day = max(day)) 
> %>% 
>   collect()
> #>   n min_year min_month min_day max_year max_month max_day
> #> 1 15120 2010 1   1 202312  30
> # even min alone is off:
> ds %>%
>   summarise(min_year = min(year)) %>% 
>   collect()
> #> # A tibble: 1 × 1
> #>   min_year
> #>  
> #> 1 2016
>   
> # but non-partitioning columns are fine:
> ds %>%
>   summarise(min_day = min(day)) %>% 
>   collect()
> #> # A tibble: 1 × 1
> #>   min_day
> #> 
> #> 1   1
>   
>   
> # But with a group_by, this seems ok
> ds %>%
>   group_by(some_nulls) %>%
>   summarise(min_year = min(year)) %>% 
>   collect()
> #> # A tibble: 3 × 2
> #>   some_nulls min_year
> #>
> #> 1  0 2010
> #> 2  1 2010
> #> 3  2 2010
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (ARROW-16700) [C++] [R] [Datasets] aggregates on partitioning columns

2022-07-05 Thread Jeroen van Straten (Jira)


[ 
https://issues.apache.org/jira/browse/ARROW-16700?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17562771#comment-17562771
 ] 

Jeroen van Straten commented on ARROW-16700:


> I think you just need to update the title of the PR. I'm not sure if that 
> removes the linkage to this issue but that's probably ok. It should add 
> linkage to the correct PR.

The (more) correct issue being 
[ARROW-16904|https://issues.apache.org/jira/projects/ARROW/issues/ARROW-16904], 
right?

> I'm not entirely sure I understand your point on the join. The partition 
> columns I think can just be normal columns as far as Substrait is considered. 
> Are you maybe thinking of the __fragment_index, __filename, etc. columns?

The latter, I think 
([these|https://github.com/apache/arrow/blob/c1a1f47b8a2772fc270832902e7d788ee467ea08/cpp/src/arrow/dataset/scanner.cc#L917-L920]
 to be specific). I don't know if it would actually be an issue at all, but it 
doesn't feel right that Scanner makes more columns than ReadRel if we're going 
to be treating them as if they map one-to-one to each other.

As for a resolution to this issue, I've now locally rewritten MakeExecBatch to 
take the guarantee as its argument, use ExtractKnownFieldValues to turn it into 
the map of constant columns (helper of SimplifyWithGuarantee; turned out to be 
way easier to just use that, and it's also more fit for this purpose), and 
prefer the constant scalar from that over the actual incoming data if known. 
It's currently giving me failures, but my code is also still a mess from all 
the debug printing so it's not that surprising.

> [C++] [R] [Datasets] aggregates on partitioning columns
> ---
>
> Key: ARROW-16700
> URL: https://issues.apache.org/jira/browse/ARROW-16700
> Project: Apache Arrow
>  Issue Type: Bug
>  Components: C++, R
>Reporter: Jonathan Keane
>Assignee: Jeroen van Straten
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 9.0.0, 8.0.1
>
>  Time Spent: 1h 10m
>  Remaining Estimate: 0h
>
> When summarizing a whole dataset (without group_by) with an aggregate, and 
> summarizing a partitioned column, arrow returns wrong data:
> {code:r}
> library(arrow, warn.conflicts = FALSE)
> library(dplyr, warn.conflicts = FALSE)
> df <- expand.grid(
>   some_nulls = c(0L, 1L, 2L),
>   year = 2010:2023,
>   month = 1:12,
>   day = 1:30
> )
> path <- tempfile()
> dir.create(path)
> write_dataset(df, path, partitioning = c("year", "month"))
> ds <- open_dataset(path)
> # with arrow the mins/maxes are off for partitioning columns
> ds %>%
>   summarise(n = n(), min_year = min(year), min_month = min(month), min_day = 
> min(day), max_year = max(year), max_month = max(month), max_day = max(day)) 
> %>% 
>   collect()
> #> # A tibble: 1 × 7
> #>   n min_year min_month min_day max_year max_month max_day
> #>   
> #> 1 15120 2023 1   1 202312  30
> # comapred to what we get with dplyr
> df %>%
>   summarise(n = n(), min_year = min(year), min_month = min(month), min_day = 
> min(day), max_year = max(year), max_month = max(month), max_day = max(day)) 
> %>% 
>   collect()
> #>   n min_year min_month min_day max_year max_month max_day
> #> 1 15120 2010 1   1 202312  30
> # even min alone is off:
> ds %>%
>   summarise(min_year = min(year)) %>% 
>   collect()
> #> # A tibble: 1 × 1
> #>   min_year
> #>  
> #> 1 2016
>   
> # but non-partitioning columns are fine:
> ds %>%
>   summarise(min_day = min(day)) %>% 
>   collect()
> #> # A tibble: 1 × 1
> #>   min_day
> #> 
> #> 1   1
>   
>   
> # But with a group_by, this seems ok
> ds %>%
>   group_by(some_nulls) %>%
>   summarise(min_year = min(year)) %>% 
>   collect()
> #> # A tibble: 3 × 2
> #>   some_nulls min_year
> #>
> #> 1  0 2010
> #> 2  1 2010
> #> 3  2 2010
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (ARROW-16700) [C++] [R] [Datasets] aggregates on partitioning columns

2022-07-05 Thread Weston Pace (Jira)


[ 
https://issues.apache.org/jira/browse/ARROW-16700?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17562762#comment-17562762
 ] 

Weston Pace commented on ARROW-16700:
-

> So I guess that basically means that my PR is linked to the wrong issue? I'm 
> not sure how to neatly resolve that

I think you just need to update the title of the PR.  I'm not sure if that 
removes the linkage to this issue but that's probably ok.  It should add 
linkage to the correct PR.

> For the guarantee issue that remains, I'm inclined to place the blame on 
> Scanner rather than all the nodes other than filter and project, and to just 
> insert the code for a trivial projection into Scanner to leverage the 
> existing SimplifyWithGuarantee implementation

I agree the best fix is on the scanner and this change should happen within the 
scan node.

> I'm assuming that an expression that only selects an existing field will just 
> result in a pointer copy, and that evaluating a literal expression to a 
> scalar is also cheap (at least if the literal isn't massive). What do you 
> think?

I think you are correct here.  It should be cheap.

> I'm not 100% sure on that one because of the tag fields that Scanner normally 
> adds, which Substrait wouldn't know about. It feels fragile to leave them in 
> because I imagine they would affect column indices after a join if not 
> treated carefully. But I get your point.

I'm not entirely sure I understand your point on the join.  The partition 
columns I think can just be normal columns as far as Substrait is considered.  
Are you maybe thinking of the __fragment_index, __filename, etc. columns?

> [C++] [R] [Datasets] aggregates on partitioning columns
> ---
>
> Key: ARROW-16700
> URL: https://issues.apache.org/jira/browse/ARROW-16700
> Project: Apache Arrow
>  Issue Type: Bug
>  Components: C++, R
>Reporter: Jonathan Keane
>Assignee: Jeroen van Straten
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 9.0.0, 8.0.1
>
>  Time Spent: 1h 10m
>  Remaining Estimate: 0h
>
> When summarizing a whole dataset (without group_by) with an aggregate, and 
> summarizing a partitioned column, arrow returns wrong data:
> {code:r}
> library(arrow, warn.conflicts = FALSE)
> library(dplyr, warn.conflicts = FALSE)
> df <- expand.grid(
>   some_nulls = c(0L, 1L, 2L),
>   year = 2010:2023,
>   month = 1:12,
>   day = 1:30
> )
> path <- tempfile()
> dir.create(path)
> write_dataset(df, path, partitioning = c("year", "month"))
> ds <- open_dataset(path)
> # with arrow the mins/maxes are off for partitioning columns
> ds %>%
>   summarise(n = n(), min_year = min(year), min_month = min(month), min_day = 
> min(day), max_year = max(year), max_month = max(month), max_day = max(day)) 
> %>% 
>   collect()
> #> # A tibble: 1 × 7
> #>   n min_year min_month min_day max_year max_month max_day
> #>   
> #> 1 15120 2023 1   1 202312  30
> # comapred to what we get with dplyr
> df %>%
>   summarise(n = n(), min_year = min(year), min_month = min(month), min_day = 
> min(day), max_year = max(year), max_month = max(month), max_day = max(day)) 
> %>% 
>   collect()
> #>   n min_year min_month min_day max_year max_month max_day
> #> 1 15120 2010 1   1 202312  30
> # even min alone is off:
> ds %>%
>   summarise(min_year = min(year)) %>% 
>   collect()
> #> # A tibble: 1 × 1
> #>   min_year
> #>  
> #> 1 2016
>   
> # but non-partitioning columns are fine:
> ds %>%
>   summarise(min_day = min(day)) %>% 
>   collect()
> #> # A tibble: 1 × 1
> #>   min_day
> #> 
> #> 1   1
>   
>   
> # But with a group_by, this seems ok
> ds %>%
>   group_by(some_nulls) %>%
>   summarise(min_year = min(year)) %>% 
>   collect()
> #> # A tibble: 3 × 2
> #>   some_nulls min_year
> #>
> #> 1  0 2010
> #> 2  1 2010
> #> 3  2 2010
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (ARROW-16700) [C++] [R] [Datasets] aggregates on partitioning columns

2022-07-05 Thread Jeroen van Straten (Jira)


[ 
https://issues.apache.org/jira/browse/ARROW-16700?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17562732#comment-17562732
 ] 

Jeroen van Straten commented on ARROW-16700:


So I guess that basically means that my PR is linked to the wrong issue? I'm 
not sure how to neatly resolve that, at least not without closing my PR and 
waiting for [~octalene] to fix min/max (and test it appropriately) instead.

For the guarantee issue that remains, I'm inclined to place the blame on 
Scanner rather than all the nodes other than filter and project, and to just 
insert the code for a trivial projection into Scanner to leverage the existing 
SimplifyWithGuarantee implementation. I'm assuming that an expression that only 
selects an existing field will just result in a pointer copy, and that 
evaluating a literal expression to a scalar is also cheap (at least if the 
literal isn't massive). What do you think?

> Future Substrait queries would, in theory, be able to create plans without 
> the preceding project node.

I'm not 100% sure on that one because of the tag fields that Scanner normally 
adds, which Substrait wouldn't know about. It feels fragile to leave them in 
because I imagine they would affect column indices after a join if not treated 
carefully. But I get your point.

> [C++] [R] [Datasets] aggregates on partitioning columns
> ---
>
> Key: ARROW-16700
> URL: https://issues.apache.org/jira/browse/ARROW-16700
> Project: Apache Arrow
>  Issue Type: Bug
>  Components: C++, R
>Reporter: Jonathan Keane
>Assignee: Jeroen van Straten
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 9.0.0, 8.0.1
>
>  Time Spent: 1h
>  Remaining Estimate: 0h
>
> When summarizing a whole dataset (without group_by) with an aggregate, and 
> summarizing a partitioned column, arrow returns wrong data:
> {code:r}
> library(arrow, warn.conflicts = FALSE)
> library(dplyr, warn.conflicts = FALSE)
> df <- expand.grid(
>   some_nulls = c(0L, 1L, 2L),
>   year = 2010:2023,
>   month = 1:12,
>   day = 1:30
> )
> path <- tempfile()
> dir.create(path)
> write_dataset(df, path, partitioning = c("year", "month"))
> ds <- open_dataset(path)
> # with arrow the mins/maxes are off for partitioning columns
> ds %>%
>   summarise(n = n(), min_year = min(year), min_month = min(month), min_day = 
> min(day), max_year = max(year), max_month = max(month), max_day = max(day)) 
> %>% 
>   collect()
> #> # A tibble: 1 × 7
> #>   n min_year min_month min_day max_year max_month max_day
> #>   
> #> 1 15120 2023 1   1 202312  30
> # comapred to what we get with dplyr
> df %>%
>   summarise(n = n(), min_year = min(year), min_month = min(month), min_day = 
> min(day), max_year = max(year), max_month = max(month), max_day = max(day)) 
> %>% 
>   collect()
> #>   n min_year min_month min_day max_year max_month max_day
> #> 1 15120 2010 1   1 202312  30
> # even min alone is off:
> ds %>%
>   summarise(min_year = min(year)) %>% 
>   collect()
> #> # A tibble: 1 × 1
> #>   min_year
> #>  
> #> 1 2016
>   
> # but non-partitioning columns are fine:
> ds %>%
>   summarise(min_day = min(day)) %>% 
>   collect()
> #> # A tibble: 1 × 1
> #>   min_day
> #> 
> #> 1   1
>   
>   
> # But with a group_by, this seems ok
> ds %>%
>   group_by(some_nulls) %>%
>   summarise(min_year = min(year)) %>% 
>   collect()
> #> # A tibble: 3 × 2
> #>   some_nulls min_year
> #>
> #> 1  0 2010
> #> 2  1 2010
> #> 3  2 2010
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (ARROW-16700) [C++] [R] [Datasets] aggregates on partitioning columns

2022-07-05 Thread Weston Pace (Jira)


[ 
https://issues.apache.org/jira/browse/ARROW-16700?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17562721#comment-17562721
 ] 

Weston Pace commented on ARROW-16700:
-

{quote}tl;dr it looks like min/max itself is broken and only aggregates only 
the last partition it sees (in each thread); every Consume call casually 
ignores and overrides this->state. Extraordinary claims demand extraordinary 
evidence, especially since I only started looking at Acero in-depth and 
installed R about five hours ago, so here's my complete analysis for 
posterity...{quote}

Yes, min/max is broken in the way you describe.  This is captured in 
ARROW-16904.  [~octalene] is working on updating the unit tests so that we can 
reproduce this issue correctly.  Sorry, I hadn't realized this JIRA also 
involved a min/max and I hadn't realized the project workaround you mentioned.  
However, that is excellent analysis.

You are correct that a project node would fix this issue.  However, a project 
node isn't normally inserted immediately after a scan node.  In fact, what is 
happening here, is that R always inserts a project node immediately _before_ an 
aggregate node.  Either way, that is a pretty significant workaround, as the 
project & filter nodes are already satisfied by the guarantee.

Still, I'd like to leave this issue in place for the moment, though maybe it 
doesn't need to be a blocker.  Future Substrait queries would, in theory, be 
able to create plans without the preceding project node.  At some point the 
scan node will not emit these columns unless they are asked for so I don't 
think a project to satisfy the column-dropping emit will always be necessary 
either.

> [C++] [R] [Datasets] aggregates on partitioning columns
> ---
>
> Key: ARROW-16700
> URL: https://issues.apache.org/jira/browse/ARROW-16700
> Project: Apache Arrow
>  Issue Type: Bug
>  Components: C++, R
>Reporter: Jonathan Keane
>Assignee: Jeroen van Straten
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 9.0.0, 8.0.1
>
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> When summarizing a whole dataset (without group_by) with an aggregate, and 
> summarizing a partitioned column, arrow returns wrong data:
> {code:r}
> library(arrow, warn.conflicts = FALSE)
> library(dplyr, warn.conflicts = FALSE)
> df <- expand.grid(
>   some_nulls = c(0L, 1L, 2L),
>   year = 2010:2023,
>   month = 1:12,
>   day = 1:30
> )
> path <- tempfile()
> dir.create(path)
> write_dataset(df, path, partitioning = c("year", "month"))
> ds <- open_dataset(path)
> # with arrow the mins/maxes are off for partitioning columns
> ds %>%
>   summarise(n = n(), min_year = min(year), min_month = min(month), min_day = 
> min(day), max_year = max(year), max_month = max(month), max_day = max(day)) 
> %>% 
>   collect()
> #> # A tibble: 1 × 7
> #>   n min_year min_month min_day max_year max_month max_day
> #>   
> #> 1 15120 2023 1   1 202312  30
> # comapred to what we get with dplyr
> df %>%
>   summarise(n = n(), min_year = min(year), min_month = min(month), min_day = 
> min(day), max_year = max(year), max_month = max(month), max_day = max(day)) 
> %>% 
>   collect()
> #>   n min_year min_month min_day max_year max_month max_day
> #> 1 15120 2010 1   1 202312  30
> # even min alone is off:
> ds %>%
>   summarise(min_year = min(year)) %>% 
>   collect()
> #> # A tibble: 1 × 1
> #>   min_year
> #>  
> #> 1 2016
>   
> # but non-partitioning columns are fine:
> ds %>%
>   summarise(min_day = min(day)) %>% 
>   collect()
> #> # A tibble: 1 × 1
> #>   min_day
> #> 
> #> 1   1
>   
>   
> # But with a group_by, this seems ok
> ds %>%
>   group_by(some_nulls) %>%
>   summarise(min_year = min(year)) %>% 
>   collect()
> #> # A tibble: 3 × 2
> #>   some_nulls min_year
> #>
> #> 1  0 2010
> #> 2  1 2010
> #> 3  2 2010
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (ARROW-16700) [C++] [R] [Datasets] aggregates on partitioning columns

2022-07-04 Thread Jeroen van Straten (Jira)


[ 
https://issues.apache.org/jira/browse/ARROW-16700?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17562191#comment-17562191
 ] 

Jeroen van Straten commented on ARROW-16700:


tl;dr it looks like min/max itself is broken and only aggregates [uninitialized 
garbage?] and the last partition it sees; every Consume call casually ignores 
and overrides this->state. Extraordinary claims demand extraordinary evidence, 
especially since I only started looking at Acero in-depth and installed R about 
five hours ago, so here's my complete analysis for posterity...

> For example, how does one resolve the query {{SELECT * FROM measurements 
> WHERE temp == 50}} when all they know for a batch is {{25 < temp < 75}}.

I would assume that in the case of a guarantee like {{25 < temp < 75}} the 
column {{temp}} is not optimized out of the physical schema as it would be for 
{{temp = 50}}, otherwise the data simply isn't there anymore. So, I don't see 
how

> However, no value is ever set on this column and so it ends up getting set 
> implicitly to {{NULL}}.

would apply in this case, and would expect that aggregation would work 
correctly under those circumstances. But I'm not sure how to verify this.

If above assessment is true, it seems to me that the problem is that either:

 * "trivial" guarantees aren't respected consistently throughout Acero, which 
is invalid because columns may already have been optimized out of the physical 
schema under the assumption that all values are known via the guarantee; or
 * if you take the 
[this|https://github.com/apache/arrow/blob/897a4c0ce73c3fe07872beee2c1d2128e44f6dd4/cpp/src/arrow/compute/exec.h#L201]
 description of ExecBatch::guarantee at face value, the problem is that the 
guarantee doesn't actually evaluate to true at all in this case, because the 
columns it asserts a constant value for are in fact optimized out and would 
thus evaluate to null. This conflicts with [the way Scanner deals with 
guarantees|https://github.com/apache/arrow/blob/897a4c0ce73c3fe07872beee2c1d2128e44f6dd4/cpp/src/arrow/dataset/scanner.cc#L911-L914],
 though.

Put differently, different parts of the codebase seem to treat {{guarantee}} in 
two incompatible ways (at least according to the comments).

Looking through the code I don't see _any_ mention of guarantees in nodes other 
than filter and project, so I would expect all nodes to fail on this, _unless_ 
they end up following a projection node, which would materialize the columns 
based on the expressions derived from the guarantee. It looks like the group-by 
example only works because a projection is inserted in that case:

{code}
 5:SinkNode{}
  4:ProjectNode{projection=[some_nulls, min_year]}
3:ProjectNode{projection=[some_nulls, min_year]}
  2:GroupByNode{keys=["some_nulls"], aggregates=[
hash_min(min_year, {skip_nulls=false, min_count=0}),
  ]}
1:ProjectNode{projection=["min_year": year, some_nulls]}
  0:SourceNode{}
{code}

... but something isn't adding up for the failing case, because I'm getting:

{code}
4:SinkNode{}
  3:ProjectNode{projection=[n, min_year, min_month, min_day, max_year, 
max_month, max_day]}
2:ScalarAggregateNode{aggregates=[
sum(n, {skip_nulls=true, min_count=1}),
min(min_year, {skip_nulls=false, min_count=0}),
min(min_month, {skip_nulls=false, min_count=0}),
min(min_day, {skip_nulls=false, min_count=0}),
max(max_year, {skip_nulls=false, min_count=0}),
max(max_month, {skip_nulls=false, min_count=0}),
max(max_day, {skip_nulls=false, min_count=0}),
]}
  1:ProjectNode{projection=["n": 1, "min_year": year, "min_month": month, 
"min_day": day, "max_year": year, "max_month": month, "max_day": day]}
0:SourceNode{}
{code}

In fact, judging by the input and output of 1:ProjectNode in either case, it 
looks like it'd be difficult to make a plan that doesn't need one, since it 
would normally at least get rid of the fragment source information.

And indeed, if I spam sufficient debug output, I see that 1:ProjectNode _is_ 
indeed materializing the columns accordingly, and 2:ScalarAggregateNode _is_ 
actually getting completely-materialized inputs passed to InputReceived. Hmmm...

Some more debug prints and clicking through code later I find that the bad 
minima and maxima are originating from 
[here|https://github.com/apache/arrow/blob/897a4c0ce73c3fe07872beee2c1d2128e44f6dd4/cpp/src/arrow/compute/kernels/aggregate_basic_internal.h#L440-L451].
 Each partition corresponds to its own call. I expected something to be subtly 
wrong here, but it doesn't appear to be subtle at all: local is created 
[here|https://github.com/apache/arrow/blob/897a4c0ce73c3fe07872beee2c1d2128e44f6dd4/cpp/src/arrow/compute/kernels/aggregate_basic_internal.h#L440]
 and is later used to override this->state 

[jira] [Commented] (ARROW-16700) [C++] [R] [Datasets] aggregates on partitioning columns

2022-06-30 Thread Weston Pace (Jira)


[ 
https://issues.apache.org/jira/browse/ARROW-16700?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17561219#comment-17561219
 ] 

Weston Pace commented on ARROW-16700:
-

CC [~bkietz] for second opinion

> [C++] [R] [Datasets] aggregates on partitioning columns
> ---
>
> Key: ARROW-16700
> URL: https://issues.apache.org/jira/browse/ARROW-16700
> Project: Apache Arrow
>  Issue Type: Bug
>  Components: C++, R
>Reporter: Jonathan Keane
>Priority: Blocker
> Fix For: 9.0.0, 8.0.1
>
>
> When summarizing a whole dataset (without group_by) with an aggregate, and 
> summarizing a partitioned column, arrow returns wrong data:
> {code:r}
> library(arrow, warn.conflicts = FALSE)
> library(dplyr, warn.conflicts = FALSE)
> df <- expand.grid(
>   some_nulls = c(0L, 1L, 2L),
>   year = 2010:2023,
>   month = 1:12,
>   day = 1:30
> )
> path <- tempfile()
> dir.create(path)
> write_dataset(df, path, partitioning = c("year", "month"))
> ds <- open_dataset(path)
> # with arrow the mins/maxes are off for partitioning columns
> ds %>%
>   summarise(n = n(), min_year = min(year), min_month = min(month), min_day = 
> min(day), max_year = max(year), max_month = max(month), max_day = max(day)) 
> %>% 
>   collect()
> #> # A tibble: 1 × 7
> #>   n min_year min_month min_day max_year max_month max_day
> #>   
> #> 1 15120 2023 1   1 202312  30
> # comapred to what we get with dplyr
> df %>%
>   summarise(n = n(), min_year = min(year), min_month = min(month), min_day = 
> min(day), max_year = max(year), max_month = max(month), max_day = max(day)) 
> %>% 
>   collect()
> #>   n min_year min_month min_day max_year max_month max_day
> #> 1 15120 2010 1   1 202312  30
> # even min alone is off:
> ds %>%
>   summarise(min_year = min(year)) %>% 
>   collect()
> #> # A tibble: 1 × 1
> #>   min_year
> #>  
> #> 1 2016
>   
> # but non-partitioning columns are fine:
> ds %>%
>   summarise(min_day = min(day)) %>% 
>   collect()
> #> # A tibble: 1 × 1
> #>   min_day
> #> 
> #> 1   1
>   
>   
> # But with a group_by, this seems ok
> ds %>%
>   group_by(some_nulls) %>%
>   summarise(min_year = min(year)) %>% 
>   collect()
> #> # A tibble: 3 × 2
> #>   some_nulls min_year
> #>
> #> 1  0 2010
> #> 2  1 2010
> #> 3  2 2010
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (ARROW-16700) [C++] [R] [Datasets] aggregates on partitioning columns

2022-06-30 Thread Weston Pace (Jira)


[ 
https://issues.apache.org/jira/browse/ARROW-16700?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17561218#comment-17561218
 ] 

Weston Pace commented on ARROW-16700:
-

So the root cause here is how partition columns are handed to the exec plan.  
In the datasets API a partitioning doesn't necessarily yield a single value for 
a batch, but rather an expression.

For example, the folder `/foo=7/chunk-0.parquet` will yield the expression `foo 
== 7` instead of the value `foo:7`.  This allow for rather novel partitioning 
schemes like `/temp=med/chunk-0.parquet` which could attach the expression `25 
< temp < 75`.

Unfortunately, this cleverness cannot really be utilized by the execution 
engine.  For example, how does one resolve the query `SELECT * FROM 
measurements WHERE temp == 50` when all they know for a batch is `25 < temp < 
75`.

At the moment, these expressions are attached to the incoming batch as a 
"guarantee".  In addition, we also add a new column to the batch (e.g. `foo` or 
`temp`).  However, no value is ever set on this column and so it ends up 
getting set implicitly to `NULL`.  This guarantee is then used by project and 
filter nodes to simplify an expression.

This is why the query `SELECT foo from dataset WHERE foo == 7` works.  The 
expression `foo == 7` gets simplified by the guarantee (`foo ==7`) to `true` 
and so the row is included (even though the value of the row is null).  The 
projection `field_ref('foo')` gets simplified to `scalar(7)` and so the 
underlying array (which is full of nulls) is not looked at.

However, aggregate nodes do not simplify with a guarantee.  So, instead, they 
see the raw underlying value (null) and it doesn't get processed correctly by 
the aggregate node.

One quick fix would be to get rid of the guarantee concept entirely.  When we 
have an incoming partition expression we should:

 * If the expression is an equality we replace it with a scalar.
 * If the expression is not an equality we raise an error.

> [C++] [R] [Datasets] aggregates on partitioning columns
> ---
>
> Key: ARROW-16700
> URL: https://issues.apache.org/jira/browse/ARROW-16700
> Project: Apache Arrow
>  Issue Type: Bug
>  Components: C++, R
>Reporter: Jonathan Keane
>Priority: Blocker
> Fix For: 9.0.0, 8.0.1
>
>
> When summarizing a whole dataset (without group_by) with an aggregate, and 
> summarizing a partitioned column, arrow returns wrong data:
> {code:r}
> library(arrow, warn.conflicts = FALSE)
> library(dplyr, warn.conflicts = FALSE)
> df <- expand.grid(
>   some_nulls = c(0L, 1L, 2L),
>   year = 2010:2023,
>   month = 1:12,
>   day = 1:30
> )
> path <- tempfile()
> dir.create(path)
> write_dataset(df, path, partitioning = c("year", "month"))
> ds <- open_dataset(path)
> # with arrow the mins/maxes are off for partitioning columns
> ds %>%
>   summarise(n = n(), min_year = min(year), min_month = min(month), min_day = 
> min(day), max_year = max(year), max_month = max(month), max_day = max(day)) 
> %>% 
>   collect()
> #> # A tibble: 1 × 7
> #>   n min_year min_month min_day max_year max_month max_day
> #>   
> #> 1 15120 2023 1   1 202312  30
> # comapred to what we get with dplyr
> df %>%
>   summarise(n = n(), min_year = min(year), min_month = min(month), min_day = 
> min(day), max_year = max(year), max_month = max(month), max_day = max(day)) 
> %>% 
>   collect()
> #>   n min_year min_month min_day max_year max_month max_day
> #> 1 15120 2010 1   1 202312  30
> # even min alone is off:
> ds %>%
>   summarise(min_year = min(year)) %>% 
>   collect()
> #> # A tibble: 1 × 1
> #>   min_year
> #>  
> #> 1 2016
>   
> # but non-partitioning columns are fine:
> ds %>%
>   summarise(min_day = min(day)) %>% 
>   collect()
> #> # A tibble: 1 × 1
> #>   min_day
> #> 
> #> 1   1
>   
>   
> # But with a group_by, this seems ok
> ds %>%
>   group_by(some_nulls) %>%
>   summarise(min_year = min(year)) %>% 
>   collect()
> #> # A tibble: 3 × 2
> #>   some_nulls min_year
> #>
> #> 1  0 2010
> #> 2  1 2010
> #> 3  2 2010
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (ARROW-16700) [C++] [R] [Datasets] aggregates on partitioning columns

2022-06-29 Thread Jonathan Keane (Jira)


[ 
https://issues.apache.org/jira/browse/ARROW-16700?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17560667#comment-17560667
 ] 

Jonathan Keane commented on ARROW-16700:


[~westonpace] not sure if this is related to ARROW-16904 or ARROW-16807 but 
another wrong-data ticket we should take a look at

> [C++] [R] [Datasets] aggregates on partitioning columns
> ---
>
> Key: ARROW-16700
> URL: https://issues.apache.org/jira/browse/ARROW-16700
> Project: Apache Arrow
>  Issue Type: Bug
>  Components: C++, R
>Reporter: Jonathan Keane
>Priority: Blocker
> Fix For: 9.0.0, 8.0.1
>
>
> When summarizing a whole dataset (without group_by) with an aggregate, and 
> summarizing a partitioned column, arrow returns wrong data:
> {code:r}
> library(arrow, warn.conflicts = FALSE)
> library(dplyr, warn.conflicts = FALSE)
> df <- expand.grid(
>   some_nulls = c(0L, 1L, 2L),
>   year = 2010:2023,
>   month = 1:12,
>   day = 1:30
> )
> path <- tempfile()
> dir.create(path)
> write_dataset(df, path, partitioning = c("year", "month"))
> ds <- open_dataset(path)
> # with arrow the mins/maxes are off for partitioning columns
> ds %>%
>   summarise(n = n(), min_year = min(year), min_month = min(month), min_day = 
> min(day), max_year = max(year), max_month = max(month), max_day = max(day)) 
> %>% 
>   collect()
> #> # A tibble: 1 × 7
> #>   n min_year min_month min_day max_year max_month max_day
> #>   
> #> 1 15120 2023 1   1 202312  30
> # comapred to what we get with dplyr
> df %>%
>   summarise(n = n(), min_year = min(year), min_month = min(month), min_day = 
> min(day), max_year = max(year), max_month = max(month), max_day = max(day)) 
> %>% 
>   collect()
> #>   n min_year min_month min_day max_year max_month max_day
> #> 1 15120 2010 1   1 202312  30
> # even min alone is off:
> ds %>%
>   summarise(min_year = min(year)) %>% 
>   collect()
> #> # A tibble: 1 × 1
> #>   min_year
> #>  
> #> 1 2016
>   
> # but non-partitioning columns are fine:
> ds %>%
>   summarise(min_day = min(day)) %>% 
>   collect()
> #> # A tibble: 1 × 1
> #>   min_day
> #> 
> #> 1   1
>   
>   
> # But with a group_by, this seems ok
> ds %>%
>   group_by(some_nulls) %>%
>   summarise(min_year = min(year)) %>% 
>   collect()
> #> # A tibble: 3 × 2
> #>   some_nulls min_year
> #>
> #> 1  0 2010
> #> 2  1 2010
> #> 3  2 2010
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)