This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 0732f31e5d [Bug](pipeline) Fix bugs for scan node and join node 
(#15164)
0732f31e5d is described below

commit 0732f31e5def5c714c907a5fbef4dc6860be354c
Author: Gabriel <[email protected]>
AuthorDate: Mon Dec 19 15:59:29 2022 +0800

    [Bug](pipeline) Fix bugs for scan node and join node (#15164)
    
    * [Bug](pipeline) Fix bugs for scan node and join node
    
    * update
---
 be/src/pipeline/exec/operator.h          | 12 ++++++++++--
 be/src/vec/exec/join/vhash_join_node.cpp |  2 +-
 2 files changed, 11 insertions(+), 3 deletions(-)

diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 4679832bb1..bbbb42efda 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -237,7 +237,6 @@ protected:
     // TODO pipeline Account for peak memory used by this operator
     RuntimeProfile::Counter* _memory_used_counter = nullptr;
 
-private:
     bool _is_closed = false;
 };
 
@@ -283,8 +282,13 @@ public:
     }
 
     Status close(RuntimeState* state) override {
+        if (is_closed()) {
+            return Status::OK();
+        }
         _fresh_exec_timer(_sink);
-        return _sink->close(state, Status::OK());
+        RETURN_IF_ERROR(_sink->close(state, Status::OK()));
+        _is_closed = true;
+        return Status::OK();
     }
 
     Status finalize(RuntimeState* state) override { return Status::OK(); }
@@ -336,10 +340,14 @@ public:
     }
 
     Status close(RuntimeState* state) override {
+        if (is_closed()) {
+            return Status::OK();
+        }
         _fresh_exec_timer(_node);
         if (!_node->decrease_ref()) {
             _node->release_resource(state);
         }
+        _is_closed = true;
         return Status::OK();
     }
 
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp 
b/be/src/vec/exec/join/vhash_join_node.cpp
index 80e47089f7..c76021c64b 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -572,7 +572,7 @@ Status HashJoinNode::push(RuntimeState* /*state*/, 
vectorized::Block* input_bloc
         if (_null_map_column == nullptr) {
             _null_map_column = ColumnUInt8::create();
         }
-        _null_map_column->get_data().assign(_probe_block.rows(), (uint8_t)0);
+        _null_map_column->get_data().assign(input_block->rows(), (uint8_t)0);
     }
 
     RETURN_IF_ERROR(_extract_join_column<false>(*input_block, 
_null_map_column, _probe_columns,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to