This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new b84aefa013f [Bug](outfile) fix missing result when parallel outfile
enabled (#43205) (#44184)
b84aefa013f is described below
commit b84aefa013f22272150f3980832d93334328ebf6
Author: Pxl <[email protected]>
AuthorDate: Mon Nov 18 20:47:22 2024 +0800
[Bug](outfile) fix missing result when parallel outfile enabled (#43205)
(#44184)
pick from #43205
---
be/src/pipeline/exec/operator.h | 4 ++--
be/src/pipeline/exec/result_file_sink_operator.cpp | 1 +
regression-test/suites/export_p0/test_outfile.groovy | 15 ++++++++++-----
3 files changed, 13 insertions(+), 7 deletions(-)
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index be54b7c4999..301f7599737 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -839,9 +839,9 @@ public:
template <typename Writer, typename Parent>
requires(std::is_base_of_v<vectorized::AsyncResultWriter, Writer>)
-class AsyncWriterSink : public PipelineXSinkLocalState<FakeSharedState> {
+class AsyncWriterSink : public PipelineXSinkLocalState<BasicSharedState> {
public:
- using Base = PipelineXSinkLocalState<FakeSharedState>;
+ using Base = PipelineXSinkLocalState<BasicSharedState>;
AsyncWriterSink(DataSinkOperatorXBase* parent, RuntimeState* state)
: Base(parent, state), _async_writer_dependency(nullptr) {
_finish_dependency =
diff --git a/be/src/pipeline/exec/result_file_sink_operator.cpp
b/be/src/pipeline/exec/result_file_sink_operator.cpp
index 7c9c38ece5c..bc4e4c88d14 100644
--- a/be/src/pipeline/exec/result_file_sink_operator.cpp
+++ b/be/src/pipeline/exec/result_file_sink_operator.cpp
@@ -95,6 +95,7 @@ Status ResultFileSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo& i
state->fragment_instance_id(), p._buf_size, &_sender,
state->execution_timeout(),
state->batch_size()));
}
+ _sender->set_dependency(state->fragment_instance_id(),
_dependency->shared_from_this());
// create writer
_writer.reset(new (std::nothrow) vectorized::VFileResultWriter(
diff --git a/regression-test/suites/export_p0/test_outfile.groovy
b/regression-test/suites/export_p0/test_outfile.groovy
index 8b60803e185..56ede909514 100644
--- a/regression-test/suites/export_p0/test_outfile.groovy
+++ b/regression-test/suites/export_p0/test_outfile.groovy
@@ -201,17 +201,22 @@ suite("test_outfile") {
`name` varchar(30)
) ENGINE=OLAP
DUPLICATE KEY(`id`)
- DISTRIBUTED BY HASH(`id`) BUCKETS 2
+ DISTRIBUTED BY HASH(`id`) BUCKETS 16
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);"""
sql """insert into select_into_file values(1, "b"),(2, "z"),(3, "a"),
(4, "c"), (5, "睿"), (6, "多"), (7, "丝"), (8, "test"),
- (100, "aa"), (111, "bb"), (123, "cc"), (222, "dd");"""
+ (100, "aa"), (111, "bb"), (123, "cc"), (222, "dd"),(1,
"b"),(2, "z"),(3, "a"),
+ (44, "c"), (55, "睿"), (66, "多"), (77, "丝"), (88, "test"),
+ (1000, "aa"), (1111, "bb"), (1234, "cc"), (2222, "dd");"""
sql "set enable_parallel_outfile = true;"
- sql """select * from select_into_file into outfile
"file://${outFilePath}/";"""
-
- sql """select * from select_into_file into outfile
"file://${outFilePath}/" properties("success_file_name" = "SUCCESS");"""
+ sql "set parallel_pipeline_task_num=4;"
+ def result = sql """select * from select_into_file into outfile
"file://${outFilePath}/";"""
+ assertEquals(4, result.size())
+ sql "set parallel_pipeline_task_num=8;"
+ result = sql """select * from select_into_file into outfile
"file://${outFilePath}/" properties("success_file_name" = "SUCCESS");"""
+ assertEquals(8, result.size())
} finally {
try_sql("DROP TABLE IF EXISTS select_into_file")
File path = new File(outFilePath)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]