This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new b3a9c247af [refactor](move-memtable) add load stream stub (#23642)
b3a9c247af is described below
commit b3a9c247af89fa9b5eaa7e8463cf589e4b02d9fd
Author: Kaijie Chen <[email protected]>
AuthorDate: Thu Aug 31 19:39:34 2023 +0800
[refactor](move-memtable) add load stream stub (#23642)
---
be/src/io/fs/stream_sink_file_writer.cpp | 60 ++-----
be/src/io/fs/stream_sink_file_writer.h | 36 +---
be/src/olap/delta_writer_context.h | 6 -
be/src/olap/delta_writer_v2.cpp | 24 ++-
be/src/olap/delta_writer_v2.h | 10 +-
be/src/olap/rowset/beta_rowset_writer_v2.cpp | 22 +--
be/src/olap/rowset/beta_rowset_writer_v2.h | 6 +-
be/src/olap/rowset/rowset_writer_context.h | 3 -
be/src/vec/sink/load_stream_stub.cpp | 232 +++++++++++++++++++++++++
be/src/vec/sink/load_stream_stub.h | 192 ++++++++++++++++++++
be/src/vec/sink/vtablet_sink_v2.cpp | 228 +++++-------------------
be/src/vec/sink/vtablet_sink_v2.h | 20 +--
be/test/io/fs/stream_sink_file_writer_test.cpp | 161 ++++++-----------
13 files changed, 578 insertions(+), 422 deletions(-)
diff --git a/be/src/io/fs/stream_sink_file_writer.cpp
b/be/src/io/fs/stream_sink_file_writer.cpp
index ff2667b9fa..6726933513 100644
--- a/be/src/io/fs/stream_sink_file_writer.cpp
+++ b/be/src/io/fs/stream_sink_file_writer.cpp
@@ -21,6 +21,7 @@
#include "olap/olap_common.h"
#include "olap/rowset/beta_rowset_writer.h"
+#include "vec/sink/load_stream_stub.h"
namespace doris {
namespace io {
@@ -35,38 +36,24 @@ void StreamSinkFileWriter::init(PUniqueId load_id, int64_t
partition_id, int64_t
_index_id = index_id;
_tablet_id = tablet_id;
_segment_id = segment_id;
-
- _header.set_src_id(_sender_id);
- *_header.mutable_load_id() = _load_id;
- _header.set_partition_id(_partition_id);
- _header.set_index_id(_index_id);
- _header.set_tablet_id(_tablet_id);
- _header.set_segment_id(_segment_id);
- _header.set_opcode(doris::PStreamHeader::APPEND_DATA);
- _append_header();
}
Status StreamSinkFileWriter::appendv(const Slice* data, size_t data_cnt) {
size_t bytes_req = 0;
for (int i = 0; i < data_cnt; i++) {
bytes_req += data[i].get_size();
- _buf.append(data[i].get_data(), data[i].get_size());
}
- _pending_bytes += bytes_req;
_bytes_appended += bytes_req;
VLOG_DEBUG << "writer appendv, load_id: " << UniqueId(_load_id).to_string()
<< ", index_id: " << _index_id << ", tablet_id: " << _tablet_id
- << ", segment_id: " << _segment_id << ", data_length: " <<
bytes_req
- << ", current batched bytes: " << _pending_bytes;
+ << ", segment_id: " << _segment_id << ", data_length: " <<
bytes_req;
- if (_pending_bytes >= _max_pending_bytes) {
- RETURN_IF_ERROR(_stream_sender(_buf));
- _buf.clear();
- _append_header();
- _pending_bytes = 0;
+ std::span<const Slice> slices {data, data_cnt};
+ for (auto& stream : _streams) {
+ RETURN_IF_ERROR(
+ stream->append_data(_partition_id, _index_id, _tablet_id,
_segment_id, slices));
}
-
return Status::OK();
}
@@ -75,38 +62,11 @@ Status StreamSinkFileWriter::finalize() {
<< ", index_id: " << _index_id << ", tablet_id: " << _tablet_id
<< ", segment_id: " << _segment_id;
// TODO(zhengyu): update get_inverted_index_file_size into stat
- Status status = _stream_sender(_buf);
- // send eos
- _buf.clear();
- _header.set_segment_eos(true);
- _append_header();
- status = _stream_sender(_buf);
- return status;
-}
-
-void StreamSinkFileWriter::_append_header() {
- size_t header_len = _header.ByteSizeLong();
- _buf.append(reinterpret_cast<uint8_t*>(&header_len), sizeof(header_len));
- _buf.append(_header.SerializeAsString());
-}
-
-Status StreamSinkFileWriter::send_with_retry(brpc::StreamId stream,
butil::IOBuf buf) {
- while (true) {
- int ret = brpc::StreamWrite(stream, buf);
- if (ret == EAGAIN) {
- const timespec time = butil::seconds_from_now(60);
- int wait_result = brpc::StreamWait(stream, &time);
- if (wait_result == 0) {
- continue;
- } else {
- return Status::InternalError("fail to send data when wait
stream");
- }
- } else if (ret == EINVAL) {
- return Status::InternalError("fail to send data when stream
write");
- } else {
- return Status::OK();
- }
+ for (auto& stream : _streams) {
+ RETURN_IF_ERROR(
+ stream->append_data(_partition_id, _index_id, _tablet_id,
_segment_id, {}, true));
}
+ return Status::OK();
}
Status StreamSinkFileWriter::close() {
diff --git a/be/src/io/fs/stream_sink_file_writer.h
b/be/src/io/fs/stream_sink_file_writer.h
index 074a2f0f5a..6c40543b93 100644
--- a/be/src/io/fs/stream_sink_file_writer.h
+++ b/be/src/io/fs/stream_sink_file_writer.h
@@ -27,18 +27,16 @@
namespace doris {
+class LoadStreamStub;
+
struct RowsetId;
struct SegmentStatistics;
namespace io {
class StreamSinkFileWriter : public FileWriter {
public:
- StreamSinkFileWriter(int sender_id, std::vector<brpc::StreamId> stream_id)
- : _sender_id(sender_id), _streams(stream_id) {}
-
- static void deleter(void* data) { ::free(data); }
-
- static Status send_with_retry(brpc::StreamId stream, butil::IOBuf buf);
+ StreamSinkFileWriter(std::vector<std::shared_ptr<LoadStreamStub>>& streams)
+ : _streams(streams) {}
void init(PUniqueId load_id, int64_t partition_id, int64_t index_id,
int64_t tablet_id,
int32_t segment_id);
@@ -58,30 +56,10 @@ public:
}
private:
- Status _stream_sender(butil::IOBuf buf) const {
- for (auto stream : _streams) {
- LOG(INFO) << "writer flushing, load_id: " <<
UniqueId(_load_id).to_string()
- << ", index_id: " << _index_id << ", tablet_id: " <<
_tablet_id
- << ", segment_id: " << _segment_id << ", stream id: " <<
stream
- << ", buf size: " << buf.size();
-
- RETURN_IF_ERROR(send_with_retry(stream, buf));
- }
- return Status::OK();
- }
-
- void _append_header();
-
-private:
- PStreamHeader _header;
- butil::IOBuf _buf;
-
- std::queue<Slice> _pending_slices;
- size_t _max_pending_bytes = config::brpc_streaming_client_batch_bytes;
- size_t _pending_bytes;
+ template <bool eos>
+ Status _flush();
- int _sender_id;
- std::vector<brpc::StreamId> _streams;
+ std::vector<std::shared_ptr<LoadStreamStub>> _streams;
PUniqueId _load_id;
int64_t _partition_id;
diff --git a/be/src/olap/delta_writer_context.h
b/be/src/olap/delta_writer_context.h
index 680f2d0b6f..da506b10b5 100644
--- a/be/src/olap/delta_writer_context.h
+++ b/be/src/olap/delta_writer_context.h
@@ -28,7 +28,6 @@ namespace doris {
class TupleDescriptor;
class SlotDescriptor;
class OlapTableSchemaParam;
-class TabletSchema;
struct WriteRequest {
int64_t tablet_id;
@@ -42,11 +41,6 @@ struct WriteRequest {
bool is_high_priority = false;
OlapTableSchemaParam* table_schema_param;
int64_t index_id = 0;
- // for DeltaWriterV2
- std::shared_ptr<TabletSchema> tablet_schema;
- bool enable_unique_key_merge_on_write = false;
- int sender_id = 0;
- std::vector<brpc::StreamId> streams;
};
} // namespace doris
\ No newline at end of file
diff --git a/be/src/olap/delta_writer_v2.cpp b/be/src/olap/delta_writer_v2.cpp
index 6f42ae068e..ff1a341f5e 100644
--- a/be/src/olap/delta_writer_v2.cpp
+++ b/be/src/olap/delta_writer_v2.cpp
@@ -58,23 +58,27 @@
#include "util/stopwatch.hpp"
#include "util/time.h"
#include "vec/core/block.h"
+#include "vec/sink/load_stream_stub.h"
namespace doris {
using namespace ErrorCode;
-Status DeltaWriterV2::open(WriteRequest* req, DeltaWriterV2** writer,
RuntimeProfile* profile) {
- *writer = new DeltaWriterV2(req, StorageEngine::instance(), profile);
+Status DeltaWriterV2::open(WriteRequest* req,
+ const std::vector<std::shared_ptr<LoadStreamStub>>&
streams,
+ DeltaWriterV2** writer, RuntimeProfile* profile) {
+ *writer = new DeltaWriterV2(req, streams, StorageEngine::instance(),
profile);
return Status::OK();
}
-DeltaWriterV2::DeltaWriterV2(WriteRequest* req, StorageEngine* storage_engine,
- RuntimeProfile* profile)
+DeltaWriterV2::DeltaWriterV2(WriteRequest* req,
+ const
std::vector<std::shared_ptr<LoadStreamStub>>& streams,
+ StorageEngine* storage_engine, RuntimeProfile*
profile)
: _req(*req),
_tablet_schema(new TabletSchema),
_profile(profile->create_child(fmt::format("DeltaWriterV2 {}",
_req.tablet_id), true,
true)),
_memtable_writer(new MemTableWriter(*req)),
- _streams(req->streams) {
+ _streams(streams) {
_init_profile(profile);
}
@@ -97,7 +101,8 @@ Status DeltaWriterV2::init() {
return Status::OK();
}
// build tablet schema in request level
- _build_current_tablet_schema(_req.index_id, _req.table_schema_param,
*_req.tablet_schema.get());
+ _build_current_tablet_schema(_req.index_id, _req.table_schema_param,
+ *_streams[0]->tablet_schema(_req.index_id));
RowsetWriterContext context;
context.txn_id = _req.txn_id;
context.load_id = _req.load_id;
@@ -112,17 +117,18 @@ Status DeltaWriterV2::init() {
context.tablet_id = _req.tablet_id;
context.partition_id = _req.partition_id;
context.tablet_schema_hash = _req.schema_hash;
- context.enable_unique_key_merge_on_write =
_req.enable_unique_key_merge_on_write;
+ context.enable_unique_key_merge_on_write =
_streams[0]->enable_unique_mow(_req.index_id);
context.rowset_type = RowsetTypePB::BETA_ROWSET;
context.rowset_id = StorageEngine::instance()->next_rowset_id();
context.data_dir = nullptr;
- context.sender_id = _req.sender_id;
_rowset_writer = std::make_shared<BetaRowsetWriterV2>(_streams);
_rowset_writer->init(context);
- _memtable_writer->init(_rowset_writer, _tablet_schema,
_req.enable_unique_key_merge_on_write);
+ _memtable_writer->init(_rowset_writer, _tablet_schema,
+ _streams[0]->enable_unique_mow(_req.index_id));
ExecEnv::GetInstance()->memtable_memory_limiter()->register_writer(_memtable_writer);
_is_init = true;
+ _streams.clear();
return Status::OK();
}
diff --git a/be/src/olap/delta_writer_v2.h b/be/src/olap/delta_writer_v2.h
index 0d78162035..741d939fa8 100644
--- a/be/src/olap/delta_writer_v2.h
+++ b/be/src/olap/delta_writer_v2.h
@@ -52,6 +52,7 @@ class TupleDescriptor;
class SlotDescriptor;
class OlapTableSchemaParam;
class BetaRowsetWriterV2;
+class LoadStreamStub;
namespace vectorized {
class Block;
@@ -61,7 +62,9 @@ class Block;
// This class is NOT thread-safe, external synchronization is required.
class DeltaWriterV2 {
public:
- static Status open(WriteRequest* req, DeltaWriterV2** writer,
RuntimeProfile* profile);
+ static Status open(WriteRequest* req,
+ const std::vector<std::shared_ptr<LoadStreamStub>>&
streams,
+ DeltaWriterV2** writer, RuntimeProfile* profile);
~DeltaWriterV2();
@@ -95,7 +98,8 @@ public:
int64_t total_received_rows() const { return _total_received_rows; }
private:
- DeltaWriterV2(WriteRequest* req, StorageEngine* storage_engine,
RuntimeProfile* profile);
+ DeltaWriterV2(WriteRequest* req, const
std::vector<std::shared_ptr<LoadStreamStub>>& streams,
+ StorageEngine* storage_engine, RuntimeProfile* profile);
void _build_current_tablet_schema(int64_t index_id,
const OlapTableSchemaParam*
table_schema_param,
@@ -122,7 +126,7 @@ private:
std::shared_ptr<MemTableWriter> _memtable_writer;
MonotonicStopWatch _lock_watch;
- std::vector<brpc::StreamId> _streams;
+ std::vector<std::shared_ptr<LoadStreamStub>> _streams;
};
} // namespace doris
diff --git a/be/src/olap/rowset/beta_rowset_writer_v2.cpp
b/be/src/olap/rowset/beta_rowset_writer_v2.cpp
index d7a641b6e0..a4bc32e32b 100644
--- a/be/src/olap/rowset/beta_rowset_writer_v2.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer_v2.cpp
@@ -54,11 +54,12 @@
#include "util/time.h"
#include "vec/common/schema_util.h" // LocalSchemaChangeRecorder
#include "vec/core/block.h"
+#include "vec/sink/load_stream_stub.h"
namespace doris {
using namespace ErrorCode;
-BetaRowsetWriterV2::BetaRowsetWriterV2(const std::vector<brpc::StreamId>&
streams)
+BetaRowsetWriterV2::BetaRowsetWriterV2(const
std::vector<std::shared_ptr<LoadStreamStub>>& streams)
: _next_segment_id(0),
_num_segment(0),
_num_rows_written(0),
@@ -78,33 +79,20 @@ Status BetaRowsetWriterV2::init(const RowsetWriterContext&
rowset_writer_context
Status BetaRowsetWriterV2::create_file_writer(uint32_t segment_id,
io::FileWriterPtr& file_writer) {
auto partition_id = _context.partition_id;
- auto sender_id = _context.sender_id;
auto index_id = _context.index_id;
auto tablet_id = _context.tablet_id;
auto load_id = _context.load_id;
- auto stream_writer = std::make_unique<io::StreamSinkFileWriter>(sender_id,
_streams);
+ auto stream_writer = std::make_unique<io::StreamSinkFileWriter>(_streams);
stream_writer->init(load_id, partition_id, index_id, tablet_id,
segment_id);
file_writer = std::move(stream_writer);
return Status::OK();
}
Status BetaRowsetWriterV2::add_segment(uint32_t segment_id, SegmentStatistics&
segstat) {
- butil::IOBuf buf;
- PStreamHeader header;
- header.set_src_id(_context.sender_id);
- *header.mutable_load_id() = _context.load_id;
- header.set_partition_id(_context.partition_id);
- header.set_index_id(_context.index_id);
- header.set_tablet_id(_context.tablet_id);
- header.set_segment_id(segment_id);
- header.set_opcode(doris::PStreamHeader::ADD_SEGMENT);
- segstat.to_pb(header.mutable_segment_statistics());
- size_t header_len = header.ByteSizeLong();
- buf.append(reinterpret_cast<uint8_t*>(&header_len), sizeof(header_len));
- buf.append(header.SerializeAsString());
for (const auto& stream : _streams) {
- io::StreamSinkFileWriter::send_with_retry(stream, buf);
+ RETURN_IF_ERROR(stream->add_segment(_context.partition_id,
_context.index_id,
+ _context.tablet_id, segment_id,
segstat));
}
return Status::OK();
}
diff --git a/be/src/olap/rowset/beta_rowset_writer_v2.h
b/be/src/olap/rowset/beta_rowset_writer_v2.h
index 919c128607..a982272217 100644
--- a/be/src/olap/rowset/beta_rowset_writer_v2.h
+++ b/be/src/olap/rowset/beta_rowset_writer_v2.h
@@ -60,9 +60,11 @@ namespace vectorized::schema_util {
class LocalSchemaChangeRecorder;
}
+class LoadStreamStub;
+
class BetaRowsetWriterV2 : public RowsetWriter {
public:
- BetaRowsetWriterV2(const std::vector<brpc::StreamId>& streams);
+ BetaRowsetWriterV2(const std::vector<std::shared_ptr<LoadStreamStub>>&
streams);
~BetaRowsetWriterV2() override;
@@ -157,7 +159,7 @@ private:
fmt::memory_buffer vlog_buffer;
- std::vector<brpc::StreamId> _streams;
+ std::vector<std::shared_ptr<LoadStreamStub>> _streams;
int64_t _delete_bitmap_ns = 0;
int64_t _segment_writer_ns = 0;
diff --git a/be/src/olap/rowset/rowset_writer_context.h
b/be/src/olap/rowset/rowset_writer_context.h
index b8bfd1225c..cb78a1233a 100644
--- a/be/src/olap/rowset/rowset_writer_context.h
+++ b/be/src/olap/rowset/rowset_writer_context.h
@@ -45,7 +45,6 @@ struct RowsetWriterContext {
rowset_type(BETA_ROWSET),
rowset_state(PREPARED),
version(Version(0, 0)),
- sender_id(0),
txn_id(0),
tablet_uid(0, 0),
segments_overlap(OVERLAP_UNKNOWN) {
@@ -68,8 +67,6 @@ struct RowsetWriterContext {
// properties for non-pending rowset
Version version;
- int sender_id;
-
// properties for pending rowset
int64_t txn_id;
PUniqueId load_id;
diff --git a/be/src/vec/sink/load_stream_stub.cpp
b/be/src/vec/sink/load_stream_stub.cpp
new file mode 100644
index 0000000000..1fbc8228b2
--- /dev/null
+++ b/be/src/vec/sink/load_stream_stub.cpp
@@ -0,0 +1,232 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/sink/load_stream_stub.h"
+
+#include "olap/rowset/rowset_writer.h"
+#include "util/brpc_client_cache.h"
+#include "util/network_util.h"
+#include "util/thrift_util.h"
+
+namespace doris {
+
+int
LoadStreamStub::LoadStreamReplyHandler::on_received_messages(brpc::StreamId id,
+ butil::IOBuf*
const messages[],
+ size_t size) {
+ for (size_t i = 0; i < size; i++) {
+ butil::IOBufAsZeroCopyInputStream wrapper(*messages[i]);
+ PWriteStreamSinkResponse response;
+ response.ParseFromZeroCopyStream(&wrapper);
+
+ Status st = Status::create(response.status());
+
+ std::stringstream ss;
+ ss << "received response from backend " << _stub->_dst_id;
+ if (response.success_tablet_ids_size() > 0) {
+ ss << ", success tablet ids:";
+ for (auto tablet_id : response.success_tablet_ids()) {
+ ss << " " << tablet_id;
+ }
+ std::lock_guard<bthread::Mutex>
lock(_stub->_success_tablets_mutex);
+ for (auto tablet_id : response.success_tablet_ids()) {
+ _stub->_success_tablets.push_back(tablet_id);
+ }
+ }
+ if (response.failed_tablet_ids_size() > 0) {
+ ss << ", failed tablet ids:";
+ for (auto tablet_id : response.failed_tablet_ids()) {
+ ss << " " << tablet_id;
+ }
+ std::lock_guard<bthread::Mutex> lock(_stub->_failed_tablets_mutex);
+ for (auto tablet_id : response.failed_tablet_ids()) {
+ _stub->_failed_tablets.push_back(tablet_id);
+ }
+ }
+ ss << ", status: " << st;
+ LOG(INFO) << ss.str();
+
+ if (response.has_load_stream_profile()) {
+ TRuntimeProfileTree tprofile;
+ const uint8_t* buf =
+ reinterpret_cast<const
uint8_t*>(response.load_stream_profile().data());
+ uint32_t len = response.load_stream_profile().size();
+ auto status = deserialize_thrift_msg(buf, &len, false, &tprofile);
+ if (status.ok()) {
+ // TODO
+ //_sink->_state->load_channel_profile()->update(tprofile);
+ } else {
+ LOG(WARNING) << "load stream TRuntimeProfileTree deserialize
failed, errmsg="
+ << status;
+ }
+ }
+ }
+ return 0;
+}
+
+void LoadStreamStub::LoadStreamReplyHandler::on_closed(brpc::StreamId id) {
+ std::lock_guard<bthread::Mutex> lock(_stub->_mutex);
+ _stub->_is_closed = true;
+ _stub->_close_cv.notify_all();
+}
+
+LoadStreamStub::LoadStreamStub(PUniqueId load_id, int64_t src_id)
+ : _load_id(load_id),
+ _src_id(src_id),
+ _tablet_schema_for_index(std::make_shared<IndexToTabletSchema>()),
+ _enable_unique_mow_for_index(std::make_shared<IndexToEnableMoW>())
{};
+
+LoadStreamStub::LoadStreamStub(LoadStreamStub& stub)
+ : _load_id(stub._load_id),
+ _src_id(stub._src_id),
+ _tablet_schema_for_index(stub._tablet_schema_for_index),
+ _enable_unique_mow_for_index(stub._enable_unique_mow_for_index) {};
+
+LoadStreamStub::~LoadStreamStub() {
+ std::unique_lock<bthread::Mutex> lock(_mutex);
+ if (_is_init && !_is_closed) {
+ brpc::StreamClose(_stream_id);
+ }
+}
+
+// open_stream_sink
+// tablets means
+Status LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>*
client_cache,
+ const NodeInfo& node_info, int64_t txn_id,
+ const OlapTableSchemaParam& schema,
+ const std::vector<PTabletID>& tablets_for_schema,
bool enable_profile) {
+ std::unique_lock<bthread::Mutex> lock(_mutex);
+ if (_is_init) {
+ return Status::OK();
+ }
+ _dst_id = node_info.id;
+ std::string host_port = get_host_port(node_info.host, node_info.brpc_port);
+ brpc::StreamOptions opt;
+ opt.max_buf_size = 20 << 20; // 20MB
+ opt.idle_timeout_ms = 30000;
+ opt.messages_in_batch = 128;
+ opt.handler = &_handler;
+ brpc::Controller cntl;
+ if (int ret = StreamCreate(&_stream_id, cntl, &opt)) {
+ return Status::Error<true>(ret, "Failed to create stream");
+ }
+ cntl.set_timeout_ms(config::open_stream_sink_timeout_ms);
+ POpenStreamSinkRequest request;
+ *request.mutable_load_id() = _load_id;
+ request.set_src_id(_src_id);
+ request.set_txn_id(txn_id);
+ request.set_enable_profile(enable_profile);
+ schema.to_protobuf(request.mutable_schema());
+ for (auto& tablet : tablets_for_schema) {
+ *request.add_tablets() = tablet;
+ }
+ POpenStreamSinkResponse response;
+ // use "pooled" connection to avoid conflicts between streaming rpc and
regular rpc,
+ // see: https://github.com/apache/brpc/issues/392
+ const auto& stub = client_cache->get_new_client_no_cache(host_port,
"baidu_std", "pooled");
+ stub->open_stream_sink(&cntl, &request, &response, nullptr);
+ for (const auto& resp : response.tablet_schemas()) {
+ auto tablet_schema = std::make_unique<TabletSchema>();
+ tablet_schema->init_from_pb(resp.tablet_schema());
+ _tablet_schema_for_index->emplace(resp.index_id(),
std::move(tablet_schema));
+ _enable_unique_mow_for_index->emplace(resp.index_id(),
+
resp.enable_unique_key_merge_on_write());
+ }
+ if (cntl.Failed()) {
+ return Status::InternalError("Failed to connect to backend {}: {}",
_dst_id,
+ cntl.ErrorText());
+ }
+ LOG(INFO) << "Opened stream " << _stream_id << " for backend " << _dst_id
<< " (" << host_port
+ << ")";
+ _is_init = true;
+ return Status::OK();
+}
+
+// APPEND_DATA
+Status LoadStreamStub::append_data(int64_t partition_id, int64_t index_id,
int64_t tablet_id,
+ int64_t segment_id, std::span<const Slice>
data,
+ bool segment_eos) {
+ PStreamHeader header;
+ header.set_src_id(_src_id);
+ *header.mutable_load_id() = _load_id;
+ header.set_partition_id(partition_id);
+ header.set_index_id(index_id);
+ header.set_tablet_id(tablet_id);
+ header.set_segment_id(segment_id);
+ header.set_segment_eos(segment_eos);
+ header.set_opcode(doris::PStreamHeader::APPEND_DATA);
+ return _encode_and_send(header, data);
+}
+
+// ADD_SEGMENT
+Status LoadStreamStub::add_segment(int64_t partition_id, int64_t index_id,
int64_t tablet_id,
+ int64_t segment_id, SegmentStatistics&
segment_stat) {
+ PStreamHeader header;
+ header.set_src_id(_src_id);
+ *header.mutable_load_id() = _load_id;
+ header.set_partition_id(partition_id);
+ header.set_index_id(index_id);
+ header.set_tablet_id(tablet_id);
+ header.set_segment_id(segment_id);
+ header.set_opcode(doris::PStreamHeader::ADD_SEGMENT);
+ segment_stat.to_pb(header.mutable_segment_statistics());
+ return _encode_and_send(header);
+}
+
+// CLOSE_LOAD
+Status LoadStreamStub::close_load(const std::vector<PTabletID>&
tablets_to_commit) {
+ PStreamHeader header;
+ *header.mutable_load_id() = _load_id;
+ header.set_src_id(_src_id);
+ header.set_opcode(doris::PStreamHeader::CLOSE_LOAD);
+ for (const auto& tablet : tablets_to_commit) {
+ *header.add_tablets_to_commit() = tablet;
+ }
+ return _encode_and_send(header);
+}
+
+Status LoadStreamStub::_encode_and_send(PStreamHeader& header, std::span<const
Slice> data) {
+ butil::IOBuf buf;
+ size_t header_len = header.ByteSizeLong();
+ buf.append(reinterpret_cast<uint8_t*>(&header_len), sizeof(header_len));
+ buf.append(header.SerializeAsString());
+ for (const auto& slice : data) {
+ buf.append(slice.get_data(), slice.get_size());
+ }
+ return _send_with_retry(buf);
+}
+
+Status LoadStreamStub::_send_with_retry(butil::IOBuf buf) {
+ for (;;) {
+ int ret = brpc::StreamWrite(_stream_id, buf);
+ switch (ret) {
+ case 0:
+ return Status::OK();
+ case EAGAIN: {
+ const timespec time = butil::seconds_from_now(60);
+ int wait_ret = brpc::StreamWait(_stream_id, &time);
+ if (wait_ret != 0) {
+ return Status::InternalError("StreamWait failed, err = ",
wait_ret);
+ }
+ break;
+ }
+ default:
+ return Status::InternalError("StreamWrite failed, err = {}", ret);
+ }
+ }
+}
+
+} // namespace doris
diff --git a/be/src/vec/sink/load_stream_stub.h
b/be/src/vec/sink/load_stream_stub.h
new file mode 100644
index 0000000000..268de6ca83
--- /dev/null
+++ b/be/src/vec/sink/load_stream_stub.h
@@ -0,0 +1,192 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+#include <brpc/controller.h>
+#include <bthread/types.h>
+#include <butil/errno.h>
+#include <fmt/format.h>
+#include <gen_cpp/PaloInternalService_types.h>
+#include <gen_cpp/Types_types.h>
+#include <gen_cpp/internal_service.pb.h>
+#include <gen_cpp/types.pb.h>
+#include <glog/logging.h>
+#include <google/protobuf/stubs/callback.h>
+#include <stddef.h>
+#include <stdint.h>
+
+#include <atomic>
+// IWYU pragma: no_include <bits/chrono.h>
+#include <chrono> // IWYU pragma: keep
+#include <functional>
+#include <initializer_list>
+#include <map>
+#include <memory>
+#include <mutex>
+#include <ostream>
+#include <queue>
+#include <set>
+#include <span>
+#include <string>
+#include <unordered_map>
+#include <unordered_set>
+#include <utility>
+#include <vector>
+
+#include "common/config.h"
+#include "common/status.h"
+#include "exec/data_sink.h"
+#include "exec/tablet_info.h"
+#include "gutil/ref_counted.h"
+#include "runtime/exec_env.h"
+#include "runtime/memory/mem_tracker.h"
+#include "runtime/thread_context.h"
+#include "runtime/types.h"
+#include "util/countdown_latch.h"
+#include "util/runtime_profile.h"
+#include "util/stopwatch.hpp"
+#include "vec/columns/column.h"
+#include "vec/common/allocator.h"
+#include "vec/core/block.h"
+#include "vec/data_types/data_type.h"
+#include "vec/exprs/vexpr_fwd.h"
+
+namespace doris {
+class TabletSchema;
+class LoadStreamStub;
+
+struct SegmentStatistics;
+
+using IndexToTabletSchema = std::unordered_map<int64_t,
std::shared_ptr<TabletSchema>>;
+using IndexToEnableMoW = std::unordered_map<int64_t, bool>;
+
+class LoadStreamStub {
+private:
+ class LoadStreamReplyHandler : public brpc::StreamInputHandler {
+ public:
+ LoadStreamReplyHandler(LoadStreamStub* stub) : _stub(stub) {}
+
+ int on_received_messages(brpc::StreamId id, butil::IOBuf* const
messages[],
+ size_t size) override;
+
+ void on_idle_timeout(brpc::StreamId id) override {}
+
+ void on_closed(brpc::StreamId id) override;
+
+ private:
+ LoadStreamStub* _stub;
+ };
+
+public:
+ // construct new stub
+ LoadStreamStub(PUniqueId load_id, int64_t src_id);
+
+ // copy constructor, shared_ptr members are shared
+ LoadStreamStub(LoadStreamStub& stub);
+
+// for mock this class in UT
+#ifdef BE_TEST
+ virtual
+#endif
+ ~LoadStreamStub();
+
+ // open_stream_sink
+ Status open(BrpcClientCache<PBackendService_Stub>* client_cache, const
NodeInfo& node_info,
+ int64_t txn_id, const OlapTableSchemaParam& schema,
+ const std::vector<PTabletID>& tablets_for_schema, bool
enable_profile);
+
+// for mock this class in UT
+#ifdef BE_TEST
+ virtual
+#endif
+ // APPEND_DATA
+ Status
+ append_data(int64_t partition_id, int64_t index_id, int64_t
tablet_id,
+ int64_t segment_id, std::span<const Slice> data, bool
segment_eos = false);
+
+ // ADD_SEGMENT
+ Status add_segment(int64_t partition_id, int64_t index_id, int64_t
tablet_id,
+ int64_t segment_id, SegmentStatistics& segment_stat);
+
+ // CLOSE_LOAD
+ Status close_load(const std::vector<PTabletID>& tablets_to_commit);
+
+ // wait remote to close stream,
+ // remote will close stream when it receives CLOSE_LOAD
+ Status close_wait(int64_t timeout_ms = 0) {
+ std::unique_lock<bthread::Mutex> lock(_mutex);
+ if (!_is_init || _is_closed) {
+ return Status::OK();
+ }
+ if (timeout_ms > 0) {
+ int ret = _close_cv.wait_for(lock, timeout_ms * 1000);
+ return ret == 0 ? Status::OK() : Status::Error<true>(ret, "stream
close_wait timeout");
+ }
+ _close_cv.wait(lock);
+ return Status::OK();
+ }
+
+ std::shared_ptr<TabletSchema> tablet_schema(int64_t index_id) const {
+ return _tablet_schema_for_index->at(index_id);
+ }
+
+ bool enable_unique_mow(int64_t index_id) const {
+ return _enable_unique_mow_for_index->at(index_id);
+ }
+
+ std::vector<int64_t> success_tablets() {
+ std::lock_guard<bthread::Mutex> lock(_success_tablets_mutex);
+ return _success_tablets;
+ }
+
+ std::vector<int64_t> failed_tablets() {
+ std::lock_guard<bthread::Mutex> lock(_failed_tablets_mutex);
+ return _failed_tablets;
+ }
+
+ brpc::StreamId stream_id() const { return _stream_id; }
+
+ int64_t src_id() const { return _src_id; }
+
+ int64_t dst_id() const { return _dst_id; }
+
+private:
+ Status _encode_and_send(PStreamHeader& header, std::span<const Slice> data
= {});
+ Status _send_with_retry(butil::IOBuf buf);
+
+protected:
+ bool _is_init = false;
+ bool _is_closed = false;
+ bthread::Mutex _mutex;
+ bthread::ConditionVariable _close_cv;
+
+ PUniqueId _load_id;
+ brpc::StreamId _stream_id;
+ int64_t _src_id = -1; // source backend_id
+ int64_t _dst_id = -1; // destination backend_id
+ LoadStreamReplyHandler _handler {this};
+
+ bthread::Mutex _success_tablets_mutex;
+ bthread::Mutex _failed_tablets_mutex;
+ std::vector<int64_t> _success_tablets;
+ std::vector<int64_t> _failed_tablets;
+
+ std::shared_ptr<IndexToTabletSchema> _tablet_schema_for_index;
+ std::shared_ptr<IndexToEnableMoW> _enable_unique_mow_for_index;
+};
+
+} // namespace doris
diff --git a/be/src/vec/sink/vtablet_sink_v2.cpp
b/be/src/vec/sink/vtablet_sink_v2.cpp
index 243507db71..eb66eac664 100644
--- a/be/src/vec/sink/vtablet_sink_v2.cpp
+++ b/be/src/vec/sink/vtablet_sink_v2.cpp
@@ -39,7 +39,6 @@
#include "common/object_pool.h"
#include "common/status.h"
#include "exec/tablet_info.h"
-#include "io/fs/stream_sink_file_writer.h"
#include "olap/delta_writer_v2.h"
#include "runtime/descriptors.h"
#include "runtime/exec_env.h"
@@ -55,6 +54,7 @@
#include "util/uid_util.h"
#include "vec/core/block.h"
#include "vec/exprs/vexpr.h"
+#include "vec/sink/load_stream_stub.h"
#include "vec/sink/vtablet_block_convertor.h"
#include "vec/sink/vtablet_finder.h"
@@ -63,76 +63,6 @@ class TExpr;
namespace stream_load {
-int StreamSinkHandler::on_received_messages(brpc::StreamId id, butil::IOBuf*
const messages[],
- size_t size) {
- int64_t backend_id = _sink->_node_id_for_stream->at(id);
-
- for (size_t i = 0; i < size; i++) {
- butil::IOBufAsZeroCopyInputStream wrapper(*messages[i]);
- PWriteStreamSinkResponse response;
- response.ParseFromZeroCopyStream(&wrapper);
-
- Status st = Status::create(response.status());
-
- std::stringstream ss;
- ss << "received response from backend " << backend_id << ", status: "
<< st
- << ", success tablet ids:";
- for (auto tablet_id : response.success_tablet_ids()) {
- ss << " " << tablet_id;
- }
- ss << ", failed tablet ids:";
- for (auto tablet_id : response.failed_tablet_ids()) {
- ss << " " << tablet_id;
- }
- LOG(INFO) << ss.str();
-
- int replica = _sink->_num_replicas;
-
- {
- std::lock_guard<bthread::Mutex>
l(_sink->_tablet_success_map_mutex);
- for (auto tablet_id : response.success_tablet_ids()) {
- if (_sink->_tablet_success_map.count(tablet_id) == 0) {
- _sink->_tablet_success_map.insert({tablet_id, {}});
- }
- _sink->_tablet_success_map[tablet_id].push_back(backend_id);
- }
- }
- {
- std::lock_guard<bthread::Mutex>
l(_sink->_tablet_failure_map_mutex);
- for (auto tablet_id : response.failed_tablet_ids()) {
- if (_sink->_tablet_failure_map.count(tablet_id) == 0) {
- _sink->_tablet_failure_map.insert({tablet_id, {}});
- }
- _sink->_tablet_failure_map[tablet_id].push_back(backend_id);
- if (_sink->_tablet_failure_map[tablet_id].size() * 2 >=
replica) {
- _sink->_cancel(Status::Cancelled(
- "Failed to meet num replicas requirements for
tablet {}", tablet_id));
- break;
- }
- }
- }
-
- if (response.has_load_stream_profile()) {
- TRuntimeProfileTree tprofile;
- const uint8_t* buf =
- reinterpret_cast<const
uint8_t*>(response.load_stream_profile().data());
- uint32_t len = response.load_stream_profile().size();
- auto status = deserialize_thrift_msg(buf, &len, false, &tprofile);
- if (status.ok()) {
- _sink->_state->load_channel_profile()->update(tprofile);
- } else {
- LOG(WARNING) << "load channel TRuntimeProfileTree deserialize
failed, errmsg="
- << status;
- }
- }
- }
- return 0;
-}
-
-void StreamSinkHandler::on_closed(brpc::StreamId id) {
- _sink->_pending_streams.fetch_add(-1);
-}
-
VOlapTableSinkV2::VOlapTableSinkV2(ObjectPool* pool, const RowDescriptor&
row_desc,
const std::vector<TExpr>& texprs, Status*
status)
: DataSink(row_desc), _pool(pool) {
@@ -210,7 +140,6 @@ Status VOlapTableSinkV2::prepare(RuntimeState* state) {
_close_timer = ADD_TIMER(_profile, "CloseWaitTime");
_close_writer_timer = ADD_CHILD_TIMER(_profile, "CloseWriterTime",
"CloseWaitTime");
_close_load_timer = ADD_CHILD_TIMER(_profile, "CloseLoadTime",
"CloseWaitTime");
- _close_stream_timer = ADD_CHILD_TIMER(_profile, "CloseStreamTime",
"CloseWaitTime");
// Prepare the exprs to run.
RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_vexpr_ctxs, state,
_row_desc));
@@ -224,8 +153,7 @@ Status VOlapTableSinkV2::open(RuntimeState* state) {
SCOPED_TIMER(_open_timer);
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
- _stream_pool_for_node = std::make_shared<StreamPoolForNode>();
- _node_id_for_stream = std::make_shared<NodeIdForStream>();
+ _stream_pool_for_node = std::make_shared<NodeToStreams>();
_delta_writer_for_tablet = std::make_shared<DeltaWriterForTablet>();
_build_tablet_node_mapping();
RETURN_IF_ERROR(_init_stream_pools());
@@ -234,71 +162,32 @@ Status VOlapTableSinkV2::open(RuntimeState* state) {
}
Status VOlapTableSinkV2::_init_stream_pools() {
+ // stub template is for sharing internal schema map among all stubs
+ LoadStreamStub stub_template {_load_id, _sender_id};
for (auto& [node_id, _] : _tablets_for_node) {
auto node_info = _nodes_info->find_node(node_id);
if (node_info == nullptr) {
return Status::InternalError("Unknown node {} in tablet location",
node_id);
}
- _stream_pool_for_node->insert({node_id, StreamPool {}});
- StreamPool& stream_pool = _stream_pool_for_node->at(node_id);
- RETURN_IF_ERROR(_init_stream_pool(*node_info, stream_pool));
- for (auto stream : stream_pool) {
- _node_id_for_stream->insert({stream, node_id});
- }
+ Streams& stream_pool = (*_stream_pool_for_node)[node_id];
+ RETURN_IF_ERROR(_init_stream_pool(*node_info, stream_pool,
stub_template));
}
return Status::OK();
}
-Status VOlapTableSinkV2::_init_stream_pool(const NodeInfo& node_info,
StreamPool& stream_pool) {
- DCHECK_GT(config::num_streams_per_sink, 0);
+Status VOlapTableSinkV2::_init_stream_pool(const NodeInfo& node_info, Streams&
stream_pool,
+ LoadStreamStub& stub_template) {
stream_pool.reserve(config::num_streams_per_sink);
for (int i = 0; i < config::num_streams_per_sink; ++i) {
- brpc::StreamOptions opt;
- opt.max_buf_size = 20 << 20; // 20MB
- opt.idle_timeout_ms = 30000;
- opt.messages_in_batch = 128;
- opt.handler = new StreamSinkHandler(this);
- brpc::StreamId stream;
- brpc::Controller cntl;
- if (int ret = StreamCreate(&stream, cntl, &opt)) {
- return Status::RpcError("Failed to create stream, code = {}", ret);
- }
- LOG(INFO) << "Created stream " << stream << " for backend " <<
node_info.id << " ("
- << node_info.host << ":" << node_info.brpc_port << ")";
- std::string host_port = get_host_port(node_info.host,
node_info.brpc_port);
- // use "pooled" connection to avoid conflicts between streaming rpc
and regular rpc,
- // see: https://github.com/apache/brpc/issues/392
- const auto& stub =
-
_state->exec_env()->brpc_internal_client_cache()->get_new_client_no_cache(
- host_port, "baidu_std", "pooled");
- POpenStreamSinkRequest request;
- *request.mutable_load_id() = _load_id;
- request.set_src_id(_sender_id);
- request.set_txn_id(_txn_id);
- request.set_enable_profile(_state->enable_profile());
- _schema->to_protobuf(request.mutable_schema());
- if (i == 0) {
- // get tablet schema from each backend only in the 1st stream
- for (auto& tablet : _indexes_from_node[node_info.id]) {
- auto req = request.add_tablets();
- *req = tablet;
- }
- }
- POpenStreamSinkResponse response;
- cntl.set_timeout_ms(config::open_stream_sink_timeout_ms);
- stub->open_stream_sink(&cntl, &request, &response, nullptr);
- for (const auto& resp : response.tablet_schemas()) {
- auto tablet_schema = std::make_shared<TabletSchema>();
- tablet_schema->init_from_pb(resp.tablet_schema());
- _tablet_schema_for_index[resp.index_id()] = tablet_schema;
- _enable_unique_mow_for_index[resp.index_id()] =
resp.enable_unique_key_merge_on_write();
- }
- if (cntl.Failed()) {
- return Status::InternalError("Failed to connect to backend {}:
{}", node_info.id,
- cntl.ErrorText());
- }
- stream_pool.push_back(stream);
- _pending_streams.fetch_add(1);
+ // internal tablet schema map will be shared among all stubs
+ auto stream = std::make_unique<LoadStreamStub>(stub_template);
+ // get tablet schema from each backend only in the 1st stream
+ const std::vector<PTabletID>& tablets_for_schema =
+ i == 0 ? _indexes_from_node[node_info.id] :
std::vector<PTabletID> {};
+
RETURN_IF_ERROR(stream->open(_state->exec_env()->brpc_internal_client_cache(),
node_info,
+ _txn_id, *_schema, tablets_for_schema,
+ _state->enable_profile()));
+ stream_pool.emplace_back(std::move(stream));
}
return Status::OK();
}
@@ -343,13 +232,13 @@ void
VOlapTableSinkV2::_generate_rows_for_tablet(RowsForTablet& rows_for_tablet,
}
}
-Status VOlapTableSinkV2::_select_streams(int64_t tablet_id,
std::vector<brpc::StreamId>& streams) {
+Status VOlapTableSinkV2::_select_streams(int64_t tablet_id, Streams& streams) {
auto location = _location->find_tablet(tablet_id);
if (location == nullptr) {
return Status::InternalError("unknown tablet location, tablet id =
{}", tablet_id);
}
for (auto& node_id : location->node_ids) {
- streams.push_back(_stream_pool_for_node->at(node_id)[_stream_index]);
+
streams.emplace_back(_stream_pool_for_node->at(node_id)[_stream_index]);
}
_stream_index = (_stream_index + 1) % config::num_streams_per_sink;
return Status::OK();
@@ -359,8 +248,6 @@ Status VOlapTableSinkV2::send(RuntimeState* state,
vectorized::Block* input_bloc
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
Status status = Status::OK();
- LOG(INFO) << "upstream id = " << state->backend_id();
-
auto input_rows = input_block->rows();
auto input_bytes = input_block->bytes();
if (UNLIKELY(input_rows == 0)) {
@@ -408,7 +295,7 @@ Status VOlapTableSinkV2::send(RuntimeState* state,
vectorized::Block* input_bloc
// For each tablet, send its input_rows from block to delta writer
for (const auto& [tablet_id, rows] : rows_for_tablet) {
- std::vector<brpc::StreamId> streams;
+ Streams streams;
RETURN_IF_ERROR(_select_streams(tablet_id, streams));
RETURN_IF_ERROR(_write_memtable(block, tablet_id, rows, streams));
}
@@ -418,7 +305,7 @@ Status VOlapTableSinkV2::send(RuntimeState* state,
vectorized::Block* input_bloc
Status VOlapTableSinkV2::_write_memtable(std::shared_ptr<vectorized::Block>
block,
int64_t tablet_id, const Rows& rows,
- const std::vector<brpc::StreamId>&
streams) {
+ const Streams& streams) {
DeltaWriterV2* delta_writer = nullptr;
{
auto it = _delta_writer_for_tablet->find(tablet_id);
@@ -434,10 +321,6 @@ Status
VOlapTableSinkV2::_write_memtable(std::shared_ptr<vectorized::Block> bloc
req.tuple_desc = _output_tuple_desc;
req.is_high_priority = _is_high_priority;
req.table_schema_param = _schema.get();
- req.tablet_schema = _tablet_schema_for_index[rows.index_id];
- req.enable_unique_key_merge_on_write =
_enable_unique_mow_for_index[rows.index_id];
- req.sender_id = _sender_id;
- req.streams = streams;
for (auto& index : _schema->indexes()) {
if (index->index_id == rows.index_id) {
req.slots = &index->slots;
@@ -445,9 +328,8 @@ Status
VOlapTableSinkV2::_write_memtable(std::shared_ptr<vectorized::Block> bloc
break;
}
}
- DeltaWriterV2::open(&req, &delta_writer, _profile);
- _delta_writer_for_tablet->insert(
- {tablet_id, std::unique_ptr<DeltaWriterV2>(delta_writer)});
+ DeltaWriterV2::open(&req, streams, &delta_writer, _profile);
+ _delta_writer_for_tablet->emplace(tablet_id, delta_writer);
} else {
VLOG_DEBUG << "Reusing DeltaWriterV2 for Tablet(tablet id: " <<
tablet_id
<< ", index id: " << rows.index_id << ")";
@@ -472,11 +354,6 @@ Status VOlapTableSinkV2::_cancel(Status status) {
[&status](auto&& entry) {
entry.second->cancel_with_status(status); });
}
_delta_writer_for_tablet.reset();
- if (_stream_pool_for_node.use_count() == 1) {
- std::for_each(std::begin(*_node_id_for_stream),
std::end(*_node_id_for_stream),
- [](auto&& entry) { brpc::StreamClose(entry.first); });
- }
- _stream_pool_for_node.reset();
return Status::OK();
}
@@ -515,43 +392,35 @@ Status VOlapTableSinkV2::close(RuntimeState* state,
Status exec_status) {
{
// send CLOSE_LOAD to all streams, return ERROR if any
- RETURN_IF_ERROR(std::transform_reduce(
- std::begin(*_node_id_for_stream),
std::end(*_node_id_for_stream), Status::OK(),
- [](Status& left, Status&& right) { return left.ok() ?
right : left; },
- [this](auto&& entry) { return _close_load(entry.first);
}));
- }
-
- {
- SCOPED_TIMER(_close_load_timer);
- while (_pending_streams.load() > 0) {
- // TODO: use a better wait
- std::this_thread::sleep_for(std::chrono::milliseconds(1));
- LOG(INFO) << "sinkv2 close_wait, pending streams: " <<
_pending_streams.load();
+ for (const auto& [node_id, stream_pool] : *_stream_pool_for_node) {
+ RETURN_IF_ERROR(_close_load(stream_pool));
}
}
{
- SCOPED_TIMER(_close_stream_timer);
- // close streams
- if (_stream_pool_for_node.use_count() == 1) {
- std::for_each(std::begin(*_node_id_for_stream),
std::end(*_node_id_for_stream),
- [](auto&& entry) {
brpc::StreamClose(entry.first); });
+ SCOPED_TIMER(_close_load_timer);
+ for (const auto& [node_id, stream_pool] : *_stream_pool_for_node) {
+ for (const auto& stream : stream_pool) {
+ stream->close_wait();
+ }
}
- _stream_pool_for_node.reset();
}
std::vector<TTabletCommitInfo> tablet_commit_infos;
- for (auto& [tablet_id, backends] : _tablet_success_map) {
- for (int64_t be_id : backends) {
- TTabletCommitInfo commit_info;
- commit_info.tabletId = tablet_id;
- commit_info.backendId = be_id;
- tablet_commit_infos.emplace_back(std::move(commit_info));
+ for (const auto& [node_id, stream_pool] : *_stream_pool_for_node) {
+ for (const auto& stream : stream_pool) {
+ for (auto tablet_id : stream->success_tablets()) {
+ TTabletCommitInfo commit_info;
+ commit_info.tabletId = tablet_id;
+ commit_info.backendId = node_id;
+ tablet_commit_infos.emplace_back(std::move(commit_info));
+ }
}
}
state->tablet_commit_infos().insert(state->tablet_commit_infos().end(),
std::make_move_iterator(tablet_commit_infos.begin()),
std::make_move_iterator(tablet_commit_infos.end()));
+ _stream_pool_for_node.reset();
// _number_input_rows don't contain num_rows_load_filtered and
num_rows_load_unselected in scan node
int64_t num_rows_load_total = _number_input_rows +
state->num_rows_load_filtered() +
@@ -573,22 +442,17 @@ Status VOlapTableSinkV2::close(RuntimeState* state,
Status exec_status) {
return status;
}
-Status VOlapTableSinkV2::_close_load(brpc::StreamId stream) {
- butil::IOBuf buf;
- PStreamHeader header;
- *header.mutable_load_id() = _load_id;
- header.set_src_id(_sender_id);
- header.set_opcode(doris::PStreamHeader::CLOSE_LOAD);
- auto node_id = _node_id_for_stream.get()->at(stream);
+Status VOlapTableSinkV2::_close_load(const Streams& streams) {
+ auto node_id = streams[0]->dst_id();
+ std::vector<PTabletID> tablets_to_commit;
for (auto tablet : _tablets_for_node[node_id]) {
if (_tablet_finder->partition_ids().contains(tablet.partition_id())) {
- *header.add_tablets_to_commit() = tablet;
+ tablets_to_commit.push_back(tablet);
}
}
- size_t header_len = header.ByteSizeLong();
- buf.append(reinterpret_cast<uint8_t*>(&header_len), sizeof(header_len));
- buf.append(header.SerializeAsString());
- io::StreamSinkFileWriter::send_with_retry(stream, buf);
+ for (const auto& stream : streams) {
+ RETURN_IF_ERROR(stream->close_load(tablets_to_commit));
+ }
return Status::OK();
}
diff --git a/be/src/vec/sink/vtablet_sink_v2.h
b/be/src/vec/sink/vtablet_sink_v2.h
index c2c24a26fb..5f50463268 100644
--- a/be/src/vec/sink/vtablet_sink_v2.h
+++ b/be/src/vec/sink/vtablet_sink_v2.h
@@ -66,6 +66,7 @@
namespace doris {
class DeltaWriterV2;
+class LoadStreamStub;
class ObjectPool;
class RowDescriptor;
class RuntimeState;
@@ -81,8 +82,8 @@ class OlapTabletFinder;
class VOlapTableSinkV2;
using DeltaWriterForTablet = std::unordered_map<int64_t,
std::unique_ptr<DeltaWriterV2>>;
-using StreamPool = std::vector<brpc::StreamId>;
-using StreamPoolForNode = std::unordered_map<int64_t, StreamPool>;
+using Streams = std::vector<std::shared_ptr<LoadStreamStub>>;
+using NodeToStreams = std::unordered_map<int64_t, Streams>;
using NodeIdForStream = std::unordered_map<brpc::StreamId, int64_t>;
using NodePartitionTabletMapping =
std::unordered_map<int64_t, std::unordered_map<int64_t,
std::unordered_set<int64_t>>>;
@@ -135,7 +136,8 @@ public:
RuntimeProfile* profile() override { return _profile; }
private:
- Status _init_stream_pool(const NodeInfo& node_info, StreamPool&
stream_pool);
+ Status _init_stream_pool(const NodeInfo& node_info, Streams& stream_pool,
+ LoadStreamStub& stub_template);
Status _init_stream_pools();
@@ -146,11 +148,11 @@ private:
int row_idx);
Status _write_memtable(std::shared_ptr<vectorized::Block> block, int64_t
tablet_id,
- const Rows& rows, const
std::vector<brpc::StreamId>& streams);
+ const Rows& rows, const Streams& streams);
- Status _select_streams(int64_t tablet_id, std::vector<brpc::StreamId>&
streams);
+ Status _select_streams(int64_t tablet_id, Streams& streams);
- Status _close_load(brpc::StreamId stream);
+ Status _close_load(const Streams& streams);
Status _cancel(Status status);
@@ -176,8 +178,6 @@ private:
// TODO(zc): think about cache this data
std::shared_ptr<OlapTableSchemaParam> _schema;
- std::unordered_map<int64_t, std::shared_ptr<TabletSchema>>
_tablet_schema_for_index;
- std::unordered_map<int64_t, bool> _enable_unique_mow_for_index;
OlapTableLocationParam* _location = nullptr;
DorisNodesInfo* _nodes_info = nullptr;
@@ -206,7 +206,6 @@ private:
RuntimeProfile::Counter* _close_timer = nullptr;
RuntimeProfile::Counter* _close_writer_timer = nullptr;
RuntimeProfile::Counter* _close_load_timer = nullptr;
- RuntimeProfile::Counter* _close_stream_timer = nullptr;
// Save the status of close() method
Status _close_status;
@@ -221,8 +220,7 @@ private:
std::unordered_map<int64_t, std::vector<PTabletID>> _tablets_for_node;
std::unordered_map<int64_t, std::vector<PTabletID>> _indexes_from_node;
- std::shared_ptr<StreamPoolForNode> _stream_pool_for_node;
- std::shared_ptr<NodeIdForStream> _node_id_for_stream;
+ std::shared_ptr<NodeToStreams> _stream_pool_for_node;
size_t _stream_index = 0;
std::shared_ptr<DeltaWriterForTablet> _delta_writer_for_tablet;
diff --git a/be/test/io/fs/stream_sink_file_writer_test.cpp
b/be/test/io/fs/stream_sink_file_writer_test.cpp
index e894cbaf91..c6ac4a3f50 100644
--- a/be/test/io/fs/stream_sink_file_writer_test.cpp
+++ b/be/test/io/fs/stream_sink_file_writer_test.cpp
@@ -24,6 +24,7 @@
#include "olap/olap_common.h"
#include "util/debug/leakcheck_disabler.h"
#include "util/faststring.h"
+#include "vec/sink/load_stream_stub.h"
namespace doris {
@@ -35,134 +36,74 @@ namespace doris {
} while (false)
#endif
-DEFINE_string(connection_type, "", "Connection type. Available values: single,
pooled, short");
-DEFINE_int32(timeout_ms, 100, "RPC timeout in milliseconds");
-DEFINE_int32(max_retry, 3, "Max retries(not including the first RPC)");
-DEFINE_int32(idle_timeout_s, -1,
- "Connection will be closed if there is no "
- "read/write operations during the last `idle_timeout_s'");
+constexpr int64_t LOAD_ID_LO = 1;
+constexpr int64_t LOAD_ID_HI = 2;
+constexpr int64_t NUM_STREAM = 3;
+constexpr int64_t PARTITION_ID = 1234;
+constexpr int64_t INDEX_ID = 2345;
+constexpr int64_t TABLET_ID = 3456;
+constexpr int32_t SEGMENT_ID = 4567;
+const std::string DATA0 = "segment data";
+const std::string DATA1 = "hello world";
+
+static std::atomic<int64_t> g_num_request;
class StreamSinkFileWriterTest : public testing::Test {
- class MockStreamSinkFileRecevier : public brpc::StreamInputHandler {
+ class MockStreamStub : public LoadStreamStub {
public:
- virtual int on_received_messages(brpc::StreamId id, butil::IOBuf*
const messages[],
- size_t size) {
- std::stringstream str;
- for (size_t i = 0; i < size; ++i) {
- str << "msg[" << i << "]=" << *messages[i];
+ MockStreamStub(PUniqueId load_id, int64_t src_id) :
LoadStreamStub(load_id, src_id) {};
+
+ virtual ~MockStreamStub() = default;
+
+ // APPEND_DATA
+ virtual Status append_data(int64_t partition_id, int64_t index_id,
int64_t tablet_id,
+ int64_t segment_id, std::span<const Slice>
data,
+ bool segment_eos = false) override {
+ EXPECT_EQ(PARTITION_ID, partition_id);
+ EXPECT_EQ(INDEX_ID, index_id);
+ EXPECT_EQ(TABLET_ID, tablet_id);
+ EXPECT_EQ(SEGMENT_ID, segment_id);
+ if (segment_eos) {
+ EXPECT_EQ(0, data.size());
+ } else {
+ EXPECT_EQ(2, data.size());
+ EXPECT_EQ(DATA0, data[0].to_string());
+ EXPECT_EQ(DATA1, data[1].to_string());
}
- LOG(INFO) << "Received from Stream=" << id << ": " << str.str();
- return 0;
- }
- virtual void on_idle_timeout(brpc::StreamId id) {
- LOG(INFO) << "Stream=" << id << " has no data transmission for a
while";
- }
- virtual void on_closed(brpc::StreamId id) { LOG(INFO) << "Stream=" <<
id << " is closed"; }
- };
-
- class StreamingSinkFileService : public PBackendService {
- public:
- StreamingSinkFileService() : _sd(brpc::INVALID_STREAM_ID) {}
- virtual ~StreamingSinkFileService() { brpc::StreamClose(_sd); };
- virtual void open_stream_sink(google::protobuf::RpcController*
controller,
- const POpenStreamSinkRequest*,
- POpenStreamSinkResponse* response,
- google::protobuf::Closure* done) {
- brpc::ClosureGuard done_guard(done);
-
- brpc::Controller* cntl =
static_cast<brpc::Controller*>(controller);
- brpc::StreamOptions stream_options;
- stream_options.handler = &_receiver;
- CHECK_EQ(0, brpc::StreamAccept(&_sd, *cntl, &stream_options));
- Status::OK().to_protobuf(response->mutable_status());
+ g_num_request++;
+ return Status::OK();
}
-
- private:
- MockStreamSinkFileRecevier _receiver;
- brpc::StreamId _sd;
};
public:
- StreamSinkFileWriterTest() { srand(time(nullptr)); }
- ~StreamSinkFileWriterTest() {}
+ StreamSinkFileWriterTest() = default;
+ ~StreamSinkFileWriterTest() = default;
protected:
virtual void SetUp() {
- // init channel
- brpc::Channel channel;
- brpc::ChannelOptions options;
- options.protocol = brpc::PROTOCOL_BAIDU_STD;
- options.connection_type = FLAGS_connection_type;
- options.timeout_ms = FLAGS_timeout_ms;
- options.max_retry = FLAGS_max_retry;
- std::stringstream port;
- CHECK_EQ(0, channel.Init("127.0.0.1:18946", nullptr));
-
- // init server
- _stream_service = new StreamingSinkFileService();
- CHECK_EQ(0, _server.AddService(_stream_service,
brpc::SERVER_DOESNT_OWN_SERVICE));
- brpc::ServerOptions server_options;
- server_options.idle_timeout_sec = FLAGS_idle_timeout_s;
- {
- debug::ScopedLeakCheckDisabler disable_lsan;
- CHECK_EQ(0, _server.Start("127.0.0.1:18946", &server_options));
+ _load_id.set_hi(LOAD_ID_HI);
+ _load_id.set_lo(LOAD_ID_LO);
+ for (int src_id = 0; src_id < NUM_STREAM; src_id++) {
+ _streams.emplace_back(new MockStreamStub(_load_id, src_id));
}
-
- // init stream connect
- PBackendService_Stub stub(&channel);
- brpc::Controller cntl;
- brpc::StreamId stream;
- CHECK_EQ(0, brpc::StreamCreate(&stream, cntl, NULL));
-
- POpenStreamSinkRequest request;
- POpenStreamSinkResponse response;
- request.mutable_load_id()->set_hi(1);
- request.mutable_load_id()->set_lo(1);
- stub.open_stream_sink(&cntl, &request, &response, NULL);
-
- brpc::Join(cntl.call_id());
- _stream = stream;
}
- virtual void TearDown() {
- CHECK_EQ(0, brpc::StreamClose(_stream));
- CHECK_EQ(0, _server.Stop(1000));
- CHECK_EQ(0, _server.Join());
- delete _stream_service;
- }
+ virtual void TearDown() {}
- StreamingSinkFileService* _stream_service;
- brpc::StreamId _stream;
- brpc::Server _server;
+ PUniqueId _load_id;
+ std::vector<std::shared_ptr<LoadStreamStub>> _streams;
};
-TEST_F(StreamSinkFileWriterTest, TestInit) {
- std::vector<brpc::StreamId> stream_ids {_stream};
- io::StreamSinkFileWriter writer(0, stream_ids);
- PUniqueId load_id;
- load_id.set_hi(1);
- load_id.set_lo(2);
- writer.init(load_id, 3, 4, 5, 6);
-}
+TEST_F(StreamSinkFileWriterTest, Test) {
+ g_num_request = 0;
+ io::StreamSinkFileWriter writer(_streams);
+ writer.init(_load_id, PARTITION_ID, INDEX_ID, TABLET_ID, SEGMENT_ID);
+ std::vector<Slice> slices {DATA0, DATA1};
-TEST_F(StreamSinkFileWriterTest, TestAppend) {
- std::vector<brpc::StreamId> stream_ids {_stream};
- io::StreamSinkFileWriter writer(0, stream_ids);
- PUniqueId load_id;
- load_id.set_hi(1);
- load_id.set_lo(2);
- writer.init(load_id, 3, 4, 5, 6);
- std::vector<Slice> slices {"hello"};
- CHECK_STATUS_OK(writer.appendv(&slices[0], slices.size()));
-}
-
-TEST_F(StreamSinkFileWriterTest, TestFinalize) {
- std::vector<brpc::StreamId> stream_ids {_stream};
- io::StreamSinkFileWriter writer(0, stream_ids);
- PUniqueId load_id;
- load_id.set_hi(1);
- load_id.set_lo(2);
- writer.init(load_id, 3, 4, 5, 6);
+ CHECK_STATUS_OK(writer.appendv(&(*slices.begin()), slices.size()));
+ EXPECT_EQ(NUM_STREAM, g_num_request);
CHECK_STATUS_OK(writer.finalize());
+ EXPECT_EQ(NUM_STREAM * 2, g_num_request);
}
+
} // namespace doris
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]