deardeng commented on code in PR #64167: URL: https://github.com/apache/doris/pull/64167#discussion_r3497797845
########## fe/fe-core/src/main/java/org/apache/doris/catalog/TenantLevelColocateTableIndex.java: ########## @@ -0,0 +1,1191 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.catalog; + +import org.apache.doris.common.DdlException; +import org.apache.doris.common.MetaNotFoundException; +import org.apache.doris.common.io.Writable; +import org.apache.doris.persist.ModifyTenantLevelColocateMapInfo; +import org.apache.doris.persist.TenantLevelColocateData; +import org.apache.doris.persist.TenantLevelColocateGroupInfo; +import org.apache.doris.persist.TenantLevelColocateStableInfo; +import org.apache.doris.persist.TenantLevelColocateTableInfo; +import org.apache.doris.resource.Tag; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.HashBasedTable; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Multimap; +import com.google.common.collect.Sets; +import com.google.common.collect.Table; +import com.google.common.collect.Table.Cell; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; + +/** + * maintain the tenant-level colocation table related indexes and meta + */ +public class TenantLevelColocateTableIndex implements Writable { + private static final Logger LOG = LogManager.getLogger(TenantLevelColocateTableIndex.class); + + // group_name -> tag-> group_id + private final Table<String, Tag, Long> groupName2Id = HashBasedTable.create(); + // group id -> group schema + private final Map<Long, TenantLevelColocateGroupSchema> group2Schema = Maps.newHashMap(); + // group_id -> bucketSeq -> backend ids + private final Map<Long, List<List<Long>>> group2BackendsPerBucketSeq = Maps.newHashMap(); + + // master data + // group_id -> table_ids + private final Multimap<Long, Long> masterGroup2Tables = ArrayListMultimap.create(); + // table_id -> group_id + private final Table<Long, Tag, Long> table2MasterGroup = HashBasedTable.create(); + + // the colocate group is unstable + private final Set<Long> unstableMasterGroups = Sets.newHashSet(); + // save some error msg of the group for show. no need to persist + private final Map<Long, String> masterGroup2ErrMsgs = Maps.newHashMap(); + + // slave data + // group_id -> table_ids + private final Multimap<Long, Long> slaveGroup2Tables = ArrayListMultimap.create(); + // table_id -> group_id + private final Table<Long, Tag, Long> table2SlaveGroup = HashBasedTable.create(); + // the colocate group is unstable + private final Set<Long> unstableSlaveGroups = Sets.newHashSet(); + // save some error msg of the group for show. no need to persist + private final Map<Long, String> slaveGroup2ErrMsgs = Maps.newHashMap(); + + private final transient ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + + public TenantLevelColocateTableIndex() { + + } + + private void readLock() { + this.lock.readLock().lock(); + } + + private void readUnlock() { + this.lock.readLock().unlock(); + } + + private void writeLock() { + this.lock.writeLock().lock(); + } + + private void writeUnlock() { + this.lock.writeLock().unlock(); + } + + // NOTICE: call 'addTableToGroup()' will not modify 'group2BackendsPerBucketSeq' + // 'group2BackendsPerBucketSeq' need to be set manually before or after, if necessary. + public TenantLevelColocateGroupSchema addTableToMasterGroup(OlapTable tbl, String groupName, Tag tag, + Long assignedGroupId) { + writeLock(); + try { + TenantLevelColocateGroupSchema groupSchema; + if (groupName2Id.contains(groupName, tag)) { + Long groupId = groupName2Id.get(groupName, tag); + groupSchema = group2Schema.get(groupId); + Preconditions.checkNotNull(groupSchema); + } else { + final long groupId; + if (assignedGroupId != null) { + // use the given group id, eg, in replay process + groupId = assignedGroupId; + } else { + // generate a new one + groupId = Env.getCurrentEnv().getNextId(); + } + HashDistributionInfo distributionInfo = (HashDistributionInfo) tbl.getDefaultDistributionInfo(); + ReplicaAllocation tblReplicaAlloc = tbl.getDefaultReplicaAllocation(); + groupSchema = new TenantLevelColocateGroupSchema(groupId, groupName, tag, + distributionInfo.getDistributionColumns(), distributionInfo.getBucketNum(), + tblReplicaAlloc.getReplicaNumByTag(tag)); + groupName2Id.put(groupName, tag, groupId); + group2Schema.put(groupId, groupSchema); + masterGroup2ErrMsgs.put(groupId, ""); + } + masterGroup2Tables.put(groupSchema.getGroupId(), tbl.getId()); + table2MasterGroup.put(tbl.getId(), tag, groupSchema.getGroupId()); + return groupSchema; + } finally { + writeUnlock(); + } + } + + public void addBackendsPerBucketSeq(long groupId, List<List<Long>> backendsPerBucketSeq) { + writeLock(); + try { + group2BackendsPerBucketSeq.put(groupId, backendsPerBucketSeq); + } finally { + writeUnlock(); + } + } + + public boolean addBackendsPerBucketSeq(long groupId, List<List<Long>> backendsPerBucketSeq, + ReplicaAllocation originReplicaAlloc) { + writeLock(); + try { + TenantLevelColocateGroupSchema groupSchema = group2Schema.get(groupId); + // replica allocation has outdate + if (groupSchema != null && !originReplicaAlloc.equals(groupSchema.getReplicaAlloc())) { + LOG.info("replica allocation has outdate for group {}, old replica alloc {}, new replica alloc {}", + groupId, originReplicaAlloc.getAllocMap(), groupSchema.getReplicaAlloc()); + return false; + } + group2BackendsPerBucketSeq.put(groupId, backendsPerBucketSeq); + return true; + } finally { + writeUnlock(); + } + } + + public void markMasterGroupUnstable(long groupId, String reason, boolean needEditLog) { + writeLock(); + try { + if (!masterGroup2Tables.containsKey(groupId)) { + return; + } + if (unstableMasterGroups.add(groupId)) { + if (needEditLog) { + TenantLevelColocateStableInfo info = new TenantLevelColocateStableInfo( + Collections.singleton(groupId)); + Env.getCurrentEnv().getEditLog().logTenantLevelColocateMarkMasterUnstable(info); + } + LOG.info("mark group {} as unstable, reason:{}", groupId, reason); + } + //update unstable reason every time not just when it was first added to group + if (unstableMasterGroups.contains(groupId)) { + masterGroup2ErrMsgs.put(groupId, Strings.nullToEmpty(reason)); + } + } finally { + writeUnlock(); + } + } + + public void markMasterGroupUnstable(Set<Long> groups, String reason, boolean needEditLog) { + Set<Long> added = new HashSet<>(); + writeLock(); + try { + for (Long groupId : groups) { + if (!masterGroup2Tables.containsKey(groupId)) { + continue; + } + if (unstableMasterGroups.add(groupId)) { + added.add(groupId); + } + //update unstable reason every time not just when it was first added to group + if (unstableMasterGroups.contains(groupId)) { + masterGroup2ErrMsgs.put(groupId, Strings.nullToEmpty(reason)); + } + } + LOG.info("mark group {} as unstable", groups); + } finally { + writeUnlock(); + } + if (needEditLog) { Review Comment: Why does the batch variant write edit log after releasing the index write lock, while the single-group variant writes it under the lock? If another thread marks the same group stable between unlock and log write, journal order can diverge from the in-memory transition order, causing replay to end with a different stable state. Please either keep mutation and edit log under the same lock, or document/prove that needEditLog=true batch calls cannot race with stable/unstable updates. Also avoid writing an empty log when added is empty. L223 and L187 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
