This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 7c67fa8651 [Bug](pipeline) fix bug of right anti join error result in 
pipeline (#15165)
7c67fa8651 is described below

commit 7c67fa865169494dc9ea1e48aa2d3d20d037e794
Author: HappenLee <[email protected]>
AuthorDate: Mon Dec 19 19:28:44 2022 +0800

    [Bug](pipeline) fix bug of right anti join error result in pipeline (#15165)
---
 be/src/pipeline/exec/operator.h          |  2 +-
 be/src/vec/exec/join/vhash_join_node.cpp | 55 +++++++++++++++++---------------
 2 files changed, 30 insertions(+), 27 deletions(-)

diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index bbbb42efda..4cd8346f88 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -428,7 +428,7 @@ public:
         if (node->need_more_input_data()) {
             RETURN_IF_ERROR(child->get_block(state, _child_block.get(), 
_child_source_state));
             source_state = _child_source_state;
-            if (_child_block->rows() == 0) {
+            if (_child_block->rows() == 0 && source_state != 
SourceState::FINISHED) {
                 return Status::OK();
             }
             node->prepare_for_next();
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp 
b/be/src/vec/exec/join/vhash_join_node.cpp
index c76021c64b..586ee87c6f 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -551,34 +551,37 @@ Status HashJoinNode::pull(doris::RuntimeState* /*state*/, 
vectorized::Block* out
     return Status::OK();
 }
 
-Status HashJoinNode::push(RuntimeState* /*state*/, vectorized::Block* 
input_block, bool /*eos*/) {
-    COUNTER_UPDATE(_probe_rows_counter, _probe_block.rows());
-    int probe_expr_ctxs_sz = _probe_expr_ctxs.size();
-    _probe_columns.resize(probe_expr_ctxs_sz);
-
-    std::vector<int> res_col_ids(probe_expr_ctxs_sz);
-    RETURN_IF_ERROR(
-            _do_evaluate(*input_block, _probe_expr_ctxs, 
*_probe_expr_call_timer, res_col_ids));
-    if (_join_op == TJoinOp::RIGHT_OUTER_JOIN || _join_op == 
TJoinOp::FULL_OUTER_JOIN) {
-        _probe_column_convert_to_null = _convert_block_to_null(*input_block);
-    }
-    // TODO: Now we are not sure whether a column is nullable only by 
ExecNode's `row_desc`
-    //  so we have to initialize this flag by the first probe block.
-    if (!_has_set_need_null_map_for_probe) {
-        _has_set_need_null_map_for_probe = true;
-        _need_null_map_for_probe = _need_probe_null_map(*input_block, 
res_col_ids);
-    }
-    if (_need_null_map_for_probe) {
-        if (_null_map_column == nullptr) {
-            _null_map_column = ColumnUInt8::create();
+Status HashJoinNode::push(RuntimeState* /*state*/, vectorized::Block* 
input_block, bool eos) {
+    _probe_eos = eos;
+    if (input_block->rows() > 0) {
+        COUNTER_UPDATE(_probe_rows_counter, _probe_block.rows());
+        int probe_expr_ctxs_sz = _probe_expr_ctxs.size();
+        _probe_columns.resize(probe_expr_ctxs_sz);
+
+        std::vector<int> res_col_ids(probe_expr_ctxs_sz);
+        RETURN_IF_ERROR(
+                _do_evaluate(*input_block, _probe_expr_ctxs, 
*_probe_expr_call_timer, res_col_ids));
+        if (_join_op == TJoinOp::RIGHT_OUTER_JOIN || _join_op == 
TJoinOp::FULL_OUTER_JOIN) {
+            _probe_column_convert_to_null = 
_convert_block_to_null(*input_block);
+        }
+        // TODO: Now we are not sure whether a column is nullable only by 
ExecNode's `row_desc`
+        //  so we have to initialize this flag by the first probe block.
+        if (!_has_set_need_null_map_for_probe) {
+            _has_set_need_null_map_for_probe = true;
+            _need_null_map_for_probe = _need_probe_null_map(*input_block, 
res_col_ids);
+        }
+        if (_need_null_map_for_probe) {
+            if (_null_map_column == nullptr) {
+                _null_map_column = ColumnUInt8::create();
+            }
+            _null_map_column->get_data().assign(input_block->rows(), 
(uint8_t)0);
         }
-        _null_map_column->get_data().assign(input_block->rows(), (uint8_t)0);
-    }
 
-    RETURN_IF_ERROR(_extract_join_column<false>(*input_block, 
_null_map_column, _probe_columns,
-                                                res_col_ids));
-    if (&_probe_block != input_block) {
-        input_block->swap(_probe_block);
+        RETURN_IF_ERROR(_extract_join_column<false>(*input_block, 
_null_map_column, _probe_columns,
+                                                    res_col_ids));
+        if (&_probe_block != input_block) {
+            input_block->swap(_probe_block);
+        }
     }
     return Status::OK();
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to