liaoxin01 commented on code in PR #62661:
URL: https://github.com/apache/doris/pull/62661#discussion_r3475262228
##########
fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java:
##########
@@ -539,6 +543,98 @@ public Map<String, Integer> getBeToInstancesNum() {
return result;
}
+ public static final class AdaptiveRandomBucketSinkContext {
+ private final List<Long> sinkBackendIds;
+ private final int planFragmentNum;
+
+ private AdaptiveRandomBucketSinkContext(List<Long> sinkBackendIds, int
planFragmentNum) {
+ this.sinkBackendIds = sinkBackendIds;
+ this.planFragmentNum = planFragmentNum;
+ }
+
+ public List<Long> getSinkBackendIds() {
+ return sinkBackendIds;
+ }
+
+ public int getPlanFragmentNum() {
+ return planFragmentNum;
+ }
+ }
+
+ public Optional<AdaptiveRandomBucketSinkContext>
getAdaptiveRandomBucketSinkContext(long tableId) {
+ Set<Long> sinkBackendIds = new TreeSet<>();
+ int planFragmentNum = 0;
+ for (PipelineExecContext context : pipelineExecContexts.values()) {
+ TPipelineFragmentParams params = context.rpcParams;
+ if (params.getFragment().getOutputSink() == null
+ || params.getFragment().getOutputSink().getType() !=
TDataSinkType.OLAP_TABLE_SINK) {
+ continue;
+ }
+ TOlapTableSink sink =
params.getFragment().getOutputSink().getOlapTableSink();
+ if (sink.getTableId() != tableId) {
+ continue;
+ }
+ if (!OlapTableSink.shouldAssignAdaptiveRandomBucket(sink)) {
+ continue;
+ }
+ sinkBackendIds.add(params.getBackendId());
+ planFragmentNum += params.getLocalParamsSize();
+ }
+ if (sinkBackendIds.isEmpty()) {
+ return Optional.empty();
+ }
+ return Optional.of(new AdaptiveRandomBucketSinkContext(
+ new ArrayList<>(sinkBackendIds), Math.max(planFragmentNum,
1)));
+ }
+
+ private static void assignAdaptiveRandomBucketForFragment(
+ Collection<TPipelineFragmentParams> fragmentParamsList) {
+ List<TPipelineFragmentParams> sinkParams = fragmentParamsList.stream()
+ .filter(param -> param.getFragment().getOutputSink() != null
+ && param.getFragment().getOutputSink().getType() ==
TDataSinkType.OLAP_TABLE_SINK)
+ .collect(Collectors.toList());
+ if (sinkParams.isEmpty()) {
+ return;
+ }
+ TOlapTableSink sink =
sinkParams.get(0).getFragment().getOutputSink().getOlapTableSink();
+ if (!OlapTableSink.shouldAssignAdaptiveRandomBucket(sink)) {
+ return;
+ }
+ List<Long> sinkBackendIds = sinkParams.stream()
+ .map(TPipelineFragmentParams::getBackendId)
+ .distinct()
+ .sorted()
+ .collect(Collectors.toList());
+ int planFragmentNum = sinkParams.stream()
Review Comment:
planFragmentNum -> sinkInstanceNum
##########
be/src/exec/sink/writer/vtablet_writer.cpp:
##########
@@ -676,6 +716,77 @@ void VNodeChannel::_open_internal(bool is_incremental) {
request->set_txn_expiration(_parent->_txn_expiration);
request->set_write_file_cache(_parent->_write_file_cache);
+ if (_parent->_tablet_finder->is_adaptive_random_bucket()) {
Review Comment:
Consider extracting the larger is_adaptive_random_bucket() blocks into named
methods.
##########
be/src/load/channel/tablets_channel.h:
##########
@@ -185,6 +197,10 @@ class BaseTabletsChannel {
std::unordered_set<int64_t> _reducing_tablets;
std::unordered_set<int64_t> _partition_ids;
+ std::shared_ptr<AdaptiveRandomBucketState> _adaptive_random_bucket_state;
+ std::mutex _partition_route_locks_lock;
+ std::unordered_map<int32_t, std::unordered_map<int64_t,
std::shared_ptr<std::mutex>>>
Review Comment:
add comment.
##########
be/src/load/channel/tablets_channel.cpp:
##########
@@ -683,6 +728,169 @@ Status BaseTabletsChannel::_write_block_data(
return Status::OK();
}
+std::shared_ptr<std::mutex>
BaseTabletsChannel::_get_sender_partition_route_lock(
Review Comment:
why need this lock?
##########
be/src/common/config.cpp:
##########
@@ -749,6 +749,8 @@ DEFINE_mInt32(memory_gc_sleep_time_ms, "500");
// max write buffer size before flush, default 200MB
DEFINE_mInt64(write_buffer_size, "209715200");
DEFINE_mBool(enable_adaptive_write_buffer_size, "true");
+// Whether random bucket load rotates to the next local bucket when memtable
flushes.
+DEFINE_mBool(enable_adaptive_random_bucket_load_bucket_rotation, "true");
Review Comment:
Do we need this config.
##########
be/src/load/channel/tablets_channel.cpp:
##########
@@ -702,10 +910,18 @@ Status TabletsChannel::add_batch(const
PTabletWriterAddBlockRequest& request,
return Status::OK();
}
+ if (request.is_receiver_side_random_bucket()) {
Review Comment:
unify adaptive_random_bucket vs receiver_side_random_bucket
The same logical switch is renamed several times across the stack, which
makes it hard to tell whether these are the same concept or different ones:
Config.enable_adaptive_random_bucket_load (FE config)
→ TOlapTableSink.enable_adaptive_random_bucket (thrift)
→ FIND_TABLET_RANDOM_BUCKET / is_adaptive_random_bucket() (BE finder)
→ PTabletWriter*.is_receiver_side_random_bucket (proto / receiver)
##########
be/src/exec/sink/vtablet_finder.h:
##########
@@ -19,15 +19,45 @@
#include <cstdint>
#include <map>
+#include <memory>
+#include <mutex>
+#include <unordered_map>
+#include <vector>
#include "common/status.h"
#include "core/block/block.h"
#include "exec/common/hash_table/phmap_fwd_decl.h"
#include "storage/tablet_info.h"
#include "util/bitmap.h"
+#include "util/uid_util.h"
namespace doris {
+class AdaptiveRandomBucketState {
Review Comment:
AdaptiveRandomBucketState looks misplaced in vtablet_finder.h
AdaptiveRandomBucketState is purely receiver-side state — it's only used by
BaseTabletsChannel . The sender-side OlapTabletFinder in this same header never
touches it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]