This is an automated email from the ASF dual-hosted git repository.
panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 92015d941dd [bug](function) fix first/last value return error with
ignore null (#44996)
92015d941dd is described below
commit 92015d941dd5e88fea1de7b08e2968374c4c4941
Author: zhangstar333 <[email protected]>
AuthorDate: Thu Dec 12 15:30:14 2024 +0800
[bug](function) fix first/last value return error with ignore null (#44996)
### What problem does this PR solve?
Problem Summary:
needs another prs of FE: https://github.com/apache/doris/pull/45065
https://github.com/apache/doris/pull/45264
1. the first value even if have set value, should not return directly,
need check it whether need arg_ignore_null, as maybe it's NULL
2. the last_value if need arg_ignore_null and not set value, should find
in while loop, if find could return directly, not find check iff has any
value before
### Release note
fix first_value/last_value return error with ignore null param is true
---
be/src/pipeline/exec/analytic_source_operator.cpp | 14 +-
.../aggregate_function_reader_first_last.h | 2 +
.../aggregate_function_window.cpp | 2 +
.../aggregate_function_window.h | 49 +++---
.../correctness_p0/test_first_value_window.out | 100 +++++++++++
.../correctness_p0/test_first_value_window.groovy | 189 +++++++++++++++++++++
6 files changed, 328 insertions(+), 28 deletions(-)
diff --git a/be/src/pipeline/exec/analytic_source_operator.cpp
b/be/src/pipeline/exec/analytic_source_operator.cpp
index 3a9156f45b6..fe0ab0b148e 100644
--- a/be/src/pipeline/exec/analytic_source_operator.cpp
+++ b/be/src/pipeline/exec/analytic_source_operator.cpp
@@ -352,17 +352,17 @@ Status AnalyticLocalState::_get_next_for_rows(size_t
current_block_rows) {
int64_t range_start, range_end;
if
(!_parent->cast<AnalyticSourceOperatorX>()._window.__isset.window_start &&
_parent->cast<AnalyticSourceOperatorX>()._window.window_end.type ==
- TAnalyticWindowBoundaryType::
- CURRENT_ROW) { //[preceding,
current_row],[current_row, following]
+ TAnalyticWindowBoundaryType::CURRENT_ROW) {
+ // [preceding, current_row], [current_row, following] rewrite it's
same
+ // as could reuse the previous calculate result, so don't call
_reset_agg_status function
+ // going on calculate, add up data, no need to reset state
range_start = _shared_state->current_row_position;
- range_end = _shared_state->current_row_position +
- 1; //going on calculate,add up data, no need to reset
state
+ range_end = _shared_state->current_row_position + 1;
} else {
_reset_agg_status();
range_end = _shared_state->current_row_position + _rows_end_offset
+ 1;
- if (!_parent->cast<AnalyticSourceOperatorX>()
- ._window.__isset
- .window_start) { //[preceding, offset]
--unbound: [preceding, following]
+ //[preceding, offset] --unbound: [preceding, following]
+ if
(!_parent->cast<AnalyticSourceOperatorX>()._window.__isset.window_start) {
range_start = _partition_by_start.pos;
} else {
range_start = _shared_state->current_row_position +
_rows_start_offset;
diff --git
a/be/src/vec/aggregate_functions/aggregate_function_reader_first_last.h
b/be/src/vec/aggregate_functions/aggregate_function_reader_first_last.h
index 2657feb9380..8efea2dc6fc 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_reader_first_last.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_reader_first_last.h
@@ -142,6 +142,8 @@ public:
bool has_set_value() { return _has_value; }
+ bool is_null() { return _data_value.is_null(); }
+
protected:
StoreType _data_value;
bool _has_value = false;
diff --git a/be/src/vec/aggregate_functions/aggregate_function_window.cpp
b/be/src/vec/aggregate_functions/aggregate_function_window.cpp
index fcb0072abe8..9df45611f0f 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_window.cpp
+++ b/be/src/vec/aggregate_functions/aggregate_function_window.cpp
@@ -41,6 +41,8 @@ AggregateFunctionPtr
create_function_lead_lag_first_last(const String& name,
WhichDataType which(*type);
bool arg_ignore_null_value = false;
+ // FE have rewrite case first_value(k1,false)--->first_value(k1)
+ // so size is 2, must will be arg_ignore_null_value
if (argument_types.size() == 2) {
DCHECK(name == "first_value" || name == "last_value") << "invalid
function name: " << name;
arg_ignore_null_value = true;
diff --git a/be/src/vec/aggregate_functions/aggregate_function_window.h
b/be/src/vec/aggregate_functions/aggregate_function_window.h
index 0011ae3aba9..13fa8e74751 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_window.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_window.h
@@ -455,31 +455,28 @@ struct WindowFunctionLagImpl : Data {
static const char* name() { return "lag"; }
};
-// TODO: first_value && last_value in some corner case will be core,
-// if need to simply change it, should set them to always nullable insert into
null value, and register in cpp maybe be change
-// But it's may be another better way to handle it
template <typename Data, bool arg_ignore_null = false>
struct WindowFunctionFirstImpl : Data {
void add_range_single_place(int64_t partition_start, int64_t
partition_end, int64_t frame_start,
int64_t frame_end, const IColumn** columns) {
- if (this->has_set_value()) {
+ // case 1: (has_set_value() = true && arg_ignore_null = false)
+ // case 2: (has_set_value() = true && arg_ignore_null = true &&
is_null() = false)
+ if ((this->has_set_value()) &&
+ (!arg_ignore_null || (arg_ignore_null && !this->is_null()))) {
return;
}
- if (frame_start <= frame_end &&
- frame_end <= partition_start) { //rewrite last_value when under
partition
- this->set_is_null(); //so no need more judge
+ DCHECK_LE(frame_start, frame_end);
+ if (frame_start >= partition_end || frame_end <= partition_start) {
+ this->set_is_null();
return;
}
frame_start = std::max<int64_t>(frame_start, partition_start);
if constexpr (arg_ignore_null) {
frame_end = std::min<int64_t>(frame_end, partition_end);
-
- auto& second_arg = assert_cast<const
ColumnVector<UInt8>&>(*columns[1]);
- auto ignore_null_value = second_arg.get_data()[0];
-
- if (ignore_null_value && columns[0]->is_nullable()) {
- auto& arg_nullable = assert_cast<const
ColumnNullable&>(*columns[0]);
+ if (columns[0]->is_nullable()) {
+ const auto& arg_nullable = assert_cast<const
ColumnNullable&>(*columns[0]);
+ // the valid range is: [frame_start, frame_end)
while (frame_start < frame_end - 1 &&
arg_nullable.is_null_at(frame_start)) {
frame_start++;
}
@@ -505,15 +502,25 @@ struct WindowFunctionLastImpl : Data {
if constexpr (arg_ignore_null) {
frame_start = std::max<int64_t>(frame_start, partition_start);
-
- auto& second_arg = assert_cast<const
ColumnVector<UInt8>&>(*columns[1]);
- auto ignore_null_value = second_arg.get_data()[0];
-
- if (ignore_null_value && columns[0]->is_nullable()) {
- auto& arg_nullable = assert_cast<const
ColumnNullable&>(*columns[0]);
- while (frame_start < (frame_end - 1) &&
arg_nullable.is_null_at(frame_end - 1)) {
- frame_end--;
+ if (columns[0]->is_nullable()) {
+ const auto& arg_nullable = assert_cast<const
ColumnNullable&>(*columns[0]);
+ // wants find a not null value in [frame_start, frame_end)
+ // iff has find: set_value and return directly
+ // iff not find: the while loop is finished
+ // case 1: iff has_set_value, means the previous window
have value, could reuse it, so return directly
+ // case 2: iff not has_set_value, means there is none
value, set it's to NULL
+ while (frame_start < frame_end) {
+ if (arg_nullable.is_null_at(frame_end - 1)) {
+ frame_end--;
+ } else {
+ this->set_value(columns, frame_end - 1);
+ return;
+ }
}
+ if (!this->has_set_value()) {
+ this->set_is_null();
+ }
+ return;
}
}
diff --git a/regression-test/data/correctness_p0/test_first_value_window.out
b/regression-test/data/correctness_p0/test_first_value_window.out
index 9951ad95c60..73dbcf3ed34 100644
--- a/regression-test/data/correctness_p0/test_first_value_window.out
+++ b/regression-test/data/correctness_p0/test_first_value_window.out
@@ -41,3 +41,103 @@
11 23 04-23-13 \N 10 10 10
12 24 02-24-10-21 \N \N \N \N
+-- !select_default4 --
+a 1 1 1 0
+a \N 1 \N 1
+a \N 1 \N 2
+a \N 1 \N 3
+b \N \N \N 4
+b 3 3 3 5
+b \N 3 \N 6
+b 2 2 2 7
+
+-- !select_default5 --
+a \N \N \N 0
+a 1 1 \N 1
+a \N 1 \N 2
+a \N 1 \N 3
+b \N \N \N 4
+b 3 3 \N 5
+b \N 3 \N 6
+b 2 3 \N 7
+
+-- !select_default_desc --
+a 2 3
+a \N 2
+a \N 1
+a 1 0
+b 2 7
+b \N 6
+b 3 5
+b \N 4
+
+-- !select_default_asc --
+a 1 0
+a \N 1
+a \N 2
+a 2 3
+b \N 4
+b 3 5
+b \N 6
+b 2 7
+
+-- !select_default_last_rewrite_first --
+a 1 1 0
+a \N 1 1
+a \N 1 2
+a 2 1 3
+b \N \N 4
+b 3 3 5
+b \N 3 6
+b 2 3 7
+
+-- !select_default6 --
+a \N 2 \N 0
+a 1 2 1 1
+a 2 2 2 2
+a \N 2 2 3
+b \N 2 \N 4
+b 3 2 3 5
+b \N 2 3 6
+b 2 2 2 7
+
+-- !select_default_last_rewrite_first2 --
+a 1 1 0
+a \N 1 1
+a \N 1 2
+a 2 2 3
+b \N \N 4
+b 3 3 5
+b \N 3 6
+b 2 2 7
+
+-- !select_default7 --
+a 1 1 1 1 1 0
+a \N 1 1 1 1 1
+a \N 1 1 1 1 2
+a 2 2 2 2 1 3
+b \N \N \N \N \N 4
+b 3 3 3 3 3 5
+b \N 3 3 3 3 6
+b 2 2 2 2 3 7
+
+-- !select_default8 --
+a 1 2 0
+a \N \N 1
+a \N \N 2
+a 2 \N 3
+b \N 2 4
+b 3 \N 5
+b \N \N 6
+b 2 \N 7
+
+-- !select_default9 --
+a 1 2 0
+a \N \N 1
+a \N \N 2
+a 2 \N 3
+b \N 2 4
+b 3 \N 5
+b \N \N 6
+b 2 \N 7
+
diff --git
a/regression-test/suites/correctness_p0/test_first_value_window.groovy
b/regression-test/suites/correctness_p0/test_first_value_window.groovy
index 8d0a3097056..7c1582e0e61 100644
--- a/regression-test/suites/correctness_p0/test_first_value_window.groovy
+++ b/regression-test/suites/correctness_p0/test_first_value_window.groovy
@@ -159,4 +159,193 @@ suite("test_first_value_window") {
,first_value(`state`, 1) over(partition by `myday` order by
`time_col` rows between 1 preceding and 1 following) v3
from ${tableName3} order by `id`, `myday`, `time_col`;
"""
+
+ qt_select_default4 """
+ SELECT uid
+ ,amt
+ ,LAST_VALUE(amt, true) OVER(PARTITION BY uid ORDER BY time_s ASC
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) amt1
+ ,LAST_VALUE(amt, false) OVER(PARTITION BY uid ORDER BY time_s ASC
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) amt2
+ ,time_s
+ FROM (
+ SELECT 'a' AS uid, 1 AS amt, 0 AS time_s UNION ALL
+ SELECT 'a' AS uid, null AS amt, 1 AS time_s UNION ALL
+ SELECT 'a' AS uid, null AS amt, 2 AS time_s UNION ALL
+ SELECT 'a' AS uid, null AS amt, 3 AS time_s UNION ALL
+ SELECT 'b' AS uid, null AS amt, 4 AS time_s UNION ALL
+ SELECT 'b' AS uid, 3 AS amt, 5 AS time_s UNION ALL
+ SELECT 'b' AS uid, null AS amt, 6 AS time_s UNION ALL
+ SELECT 'b' AS uid, 2 AS amt, 7 AS time_s
+ ) t
+ ORDER BY uid, time_s
+ ;
+ """
+
+ qt_select_default5 """
+ SELECT uid
+ ,amt
+ ,FIRST_VALUE(amt, true) OVER(PARTITION BY uid ORDER BY time_s ASC
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) amt1
+ ,FIRST_VALUE(amt, false) OVER(PARTITION BY uid ORDER BY time_s ASC
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) amt2
+ ,time_s
+ FROM (
+ SELECT 'a' AS uid, NULL AS amt, 0 AS time_s UNION ALL
+ SELECT 'a' AS uid, 1 AS amt, 1 AS time_s UNION ALL
+ SELECT 'a' AS uid, null AS amt, 2 AS time_s UNION ALL
+ SELECT 'a' AS uid, null AS amt, 3 AS time_s UNION ALL
+ SELECT 'b' AS uid, null AS amt, 4 AS time_s UNION ALL
+ SELECT 'b' AS uid, 3 AS amt, 5 AS time_s UNION ALL
+ SELECT 'b' AS uid, null AS amt, 6 AS time_s UNION ALL
+ SELECT 'b' AS uid, 2 AS amt, 7 AS time_s
+ ) t
+ ORDER BY uid, time_s
+ ;
+ """
+ qt_select_default_desc """
+ SELECT uid
+ ,amt
+ ,time_s
+ FROM (
+ SELECT 'a' AS uid, 1 AS amt, 0 AS time_s UNION ALL
+ SELECT 'a' AS uid, null AS amt, 1 AS time_s UNION ALL
+ SELECT 'a' AS uid, null AS amt, 2 AS time_s UNION ALL
+ SELECT 'a' AS uid, 2 AS amt, 3 AS time_s UNION ALL
+ SELECT 'b' AS uid, null AS amt, 4 AS time_s UNION ALL
+ SELECT 'b' AS uid, 3 AS amt, 5 AS time_s UNION ALL
+ SELECT 'b' AS uid, null AS amt, 6 AS time_s UNION ALL
+ SELECT 'b' AS uid, 2 AS amt, 7 AS time_s
+ ) t
+ order by uid,time_s desc;
+ """
+
+ qt_select_default_asc """
+ SELECT uid
+ ,amt
+ ,time_s
+ FROM (
+ SELECT 'a' AS uid, 1 AS amt, 0 AS time_s UNION ALL
+ SELECT 'a' AS uid, null AS amt, 1 AS time_s UNION ALL
+ SELECT 'a' AS uid, null AS amt, 2 AS time_s UNION ALL
+ SELECT 'a' AS uid, 2 AS amt, 3 AS time_s UNION ALL
+ SELECT 'b' AS uid, null AS amt, 4 AS time_s UNION ALL
+ SELECT 'b' AS uid, 3 AS amt, 5 AS time_s UNION ALL
+ SELECT 'b' AS uid, null AS amt, 6 AS time_s UNION ALL
+ SELECT 'b' AS uid, 2 AS amt, 7 AS time_s
+ ) t
+ order by uid,time_s ASC;
+ """
+
+ // FIRST_VALUE: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
+ qt_select_default_last_rewrite_first """
+ SELECT uid
+ ,amt
+ ,(LAST_VALUE(amt, true) OVER(PARTITION BY uid ORDER BY time_s DESC
ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING)) amt3
+ ,time_s
+ FROM (
+ SELECT 'a' AS uid, 1 AS amt, 0 AS time_s UNION ALL
+ SELECT 'a' AS uid, null AS amt, 1 AS time_s UNION ALL
+ SELECT 'a' AS uid, null AS amt, 2 AS time_s UNION ALL
+ SELECT 'a' AS uid, 2 AS amt, 3 AS time_s UNION ALL
+ SELECT 'b' AS uid, null AS amt, 4 AS time_s UNION ALL
+ SELECT 'b' AS uid, 3 AS amt, 5 AS time_s UNION ALL
+ SELECT 'b' AS uid, null AS amt, 6 AS time_s UNION ALL
+ SELECT 'b' AS uid, 2 AS amt, 7 AS time_s
+ ) t
+ ORDER BY uid, time_s;
+ """
+
+ qt_select_default6 """
+ SELECT uid
+ ,amt
+ ,LAST_VALUE(amt, true) OVER(PARTITION BY uid ORDER BY time_s ASC ROWS
BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED following) amt1
+ ,LAST_VALUE(amt, true) OVER(PARTITION BY uid ORDER BY time_s ASC ROWS
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) amt2
+ ,time_s
+ FROM (
+ SELECT 'a' AS uid, null AS amt, 0 AS time_s UNION ALL
+ SELECT 'a' AS uid, 1 AS amt, 1 AS time_s UNION ALL
+ SELECT 'a' AS uid, 2 AS amt, 2 AS time_s UNION ALL
+ SELECT 'a' AS uid, null AS amt, 3 AS time_s UNION ALL
+ SELECT 'b' AS uid, null AS amt, 4 AS time_s UNION ALL
+ SELECT 'b' AS uid, 3 AS amt, 5 AS time_s UNION ALL
+ SELECT 'b' AS uid, null AS amt, 6 AS time_s UNION ALL
+ SELECT 'b' AS uid, 2 AS amt, 7 AS time_s
+ ) t
+ ORDER BY uid, time_s
+ ;
+ """
+
+ //last value: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
+ qt_select_default_last_rewrite_first2 """
+ SELECT uid
+ ,amt
+ ,(FIRST_VALUE(amt, true) OVER(PARTITION BY uid ORDER BY time_s DESC
ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING)) amt3
+ ,time_s
+ FROM (
+ SELECT 'a' AS uid, 1 AS amt, 0 AS time_s UNION ALL
+ SELECT 'a' AS uid, null AS amt, 1 AS time_s UNION ALL
+ SELECT 'a' AS uid, null AS amt, 2 AS time_s UNION ALL
+ SELECT 'a' AS uid, 2 AS amt, 3 AS time_s UNION ALL
+ SELECT 'b' AS uid, null AS amt, 4 AS time_s UNION ALL
+ SELECT 'b' AS uid, 3 AS amt, 5 AS time_s UNION ALL
+ SELECT 'b' AS uid, null AS amt, 6 AS time_s UNION ALL
+ SELECT 'b' AS uid, 2 AS amt, 7 AS time_s
+ ) t
+ ORDER BY uid, time_s;
+ """
+
+ qt_select_default7 """
+ SELECT uid
+ ,amt
+ ,COALESCE(LAST_VALUE(amt, true) OVER(PARTITION BY uid ORDER BY time_s
ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)) amt1
+ ,COALESCE(LAST_VALUE(amt, true) OVER(PARTITION BY uid ORDER BY time_s
ASC ROWS BETWEEN 100 PRECEDING AND CURRENT ROW)) amt_not
+ ,COALESCE(FIRST_VALUE(amt, true) OVER(PARTITION BY uid ORDER BY time_s
DESC ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING)) amt2
+ ,COALESCE(LAST_VALUE(amt, true) OVER(PARTITION BY uid ORDER BY time_s
DESC ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING)) amt3
+ ,time_s
+ FROM (
+ SELECT 'a' AS uid, 1 AS amt, 0 AS time_s UNION ALL
+ SELECT 'a' AS uid, null AS amt, 1 AS time_s UNION ALL
+ SELECT 'a' AS uid, null AS amt, 2 AS time_s UNION ALL
+ SELECT 'a' AS uid, 2 AS amt, 3 AS time_s UNION ALL
+ SELECT 'b' AS uid, null AS amt, 4 AS time_s UNION ALL
+ SELECT 'b' AS uid, 3 AS amt, 5 AS time_s UNION ALL
+ SELECT 'b' AS uid, null AS amt, 6 AS time_s UNION ALL
+ SELECT 'b' AS uid, 2 AS amt, 7 AS time_s
+ ) t
+ ORDER BY uid, time_s
+ ;
+ """
+
+ qt_select_default8 """
+ SELECT uid
+ ,amt
+ ,(FIRST_VALUE(amt, true) OVER(PARTITION BY uid ORDER BY time_s ROWS
between 3 following AND 6 FOLLOWING)) amt3
+ ,time_s
+ FROM (
+ SELECT 'a' AS uid, 1 AS amt, 0 AS time_s UNION ALL
+ SELECT 'a' AS uid, null AS amt, 1 AS time_s UNION ALL
+ SELECT 'a' AS uid, null AS amt, 2 AS time_s UNION ALL
+ SELECT 'a' AS uid, 2 AS amt, 3 AS time_s UNION ALL
+ SELECT 'b' AS uid, null AS amt, 4 AS time_s UNION ALL
+ SELECT 'b' AS uid, 3 AS amt, 5 AS time_s UNION ALL
+ SELECT 'b' AS uid, null AS amt, 6 AS time_s UNION ALL
+ SELECT 'b' AS uid, 2 AS amt, 7 AS time_s
+ ) t
+ ORDER BY uid, time_s;
+ """
+
+ qt_select_default9 """
+ SELECT uid
+ ,amt
+ ,(FIRST_VALUE(amt) OVER(PARTITION BY uid ORDER BY time_s ROWS between
3 following AND 6 FOLLOWING)) amt3
+ ,time_s
+ FROM (
+ SELECT 'a' AS uid, 1 AS amt, 0 AS time_s UNION ALL
+ SELECT 'a' AS uid, null AS amt, 1 AS time_s UNION ALL
+ SELECT 'a' AS uid, null AS amt, 2 AS time_s UNION ALL
+ SELECT 'a' AS uid, 2 AS amt, 3 AS time_s UNION ALL
+ SELECT 'b' AS uid, null AS amt, 4 AS time_s UNION ALL
+ SELECT 'b' AS uid, 3 AS amt, 5 AS time_s UNION ALL
+ SELECT 'b' AS uid, null AS amt, 6 AS time_s UNION ALL
+ SELECT 'b' AS uid, 2 AS amt, 7 AS time_s
+ ) t
+ ORDER BY uid, time_s;
+ """
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]