morningman closed pull request #246: Colocate Join (#245)
URL: https://github.com/apache/incubator-doris/pull/246
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/help/Contents/Data Definition/ddl_stmt.md 
b/docs/help/Contents/Data Definition/ddl_stmt.md
index 2fd8eff2..12a1968d 100644
--- a/docs/help/Contents/Data Definition/ddl_stmt.md    
+++ b/docs/help/Contents/Data Definition/ddl_stmt.md    
@@ -200,6 +200,11 @@
            PROPERTIES (
            "bloom_filter_columns"="k1,k2,k3"
            )
+        4) 如果希望使用Colocate Join 特性,需要在 properties 中指定
+
+           PROPERTIES (
+           "colocate_with"="table1"
+           )
     
 ## example
     1. 创建一个 olap 表,使用 Random 分桶,使用列存,相同key的记录进行聚合
@@ -317,6 +322,28 @@
         DISTRIBUTED BY RANDOM BUCKETS 32
         PROPERTIES ("storage_type"="column");
 
+    7. 创建两张支持Colocat Join的表t1 和t2
+        CREATE TABLE `t1` (
+        `id` int(11) COMMENT "",
+        `value` varchar(8) COMMENT ""
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`id`)
+        DISTRIBUTED BY HASH(`id`) BUCKETS 10
+        PROPERTIES (
+        "colocate_with" = "t1"
+        );
+
+        CREATE TABLE `t2` (
+        `id` int(11) COMMENT "",
+        `value` varchar(8) COMMENT ""
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`id`)
+        DISTRIBUTED BY HASH(`id`) BUCKETS 10
+        PROPERTIES (
+        "colocate_with" = "t1"
+        );
+
+
 ## keyword
     CREATE,TABLE
         
@@ -456,7 +483,7 @@
             1) index 中的所有列都要写出来
             2) value 列在 key 列之后
             
-    6. 修改table的属性,目前仅支持修改bloom filter列
+    6. 修改table的属性,目前支持修改bloom filter列和colocate_with 属性
         语法:
             PROPERTIES ("key"="value")
         注意:
@@ -563,6 +590,9 @@
         ALTER TABLE example_db.my_table
         DROP COLUMN col2
         PROPERTIES ("bloom_filter_columns"="k1,k2,k3");
+
+    12. 修改表的Colocate 属性
+        ALTER TABLE example_db.my_table set ("colocate_with"="t1");
         
     [rename]
     1. 将名为 table1 的表修改为 table2
@@ -987,3 +1017,76 @@
 ## keyword
     TRUNCATE,TABLE
 
+# Colocate Join
+## description
+    Colocate/Local Join 就是指多个节点Join时没有数据移动和网络传输,每个节点只在本地进行Join,
+    能够本地进行Join的前提是相同Join Key的数据导入时按照相同规则导入到固定的节点。
+
+    1 How To Use:
+
+        只需要在建表时增加 colocate_with 这个属性即可,colocate_with的值 可以设置成同一组colocate 
表中的任意一个,
+        不过需要保证colocate_with属性中的表要先建立。
+
+        假如需要对table t1 和t2 进行Colocate Join,可以按以下语句建表:
+
+            CREATE TABLE `t1` (
+            `id` int(11) COMMENT "",
+            `value` varchar(8) COMMENT ""
+            ) ENGINE=OLAP
+            DUPLICATE KEY(`id`)
+            DISTRIBUTED BY HASH(`id`) BUCKETS 10
+            PROPERTIES (
+            "colocate_with" = "t1"
+            );
+
+            CREATE TABLE `t2` (
+            `id` int(11) COMMENT "",
+            `value` varchar(8) COMMENT ""
+            ) ENGINE=OLAP
+            DUPLICATE KEY(`id`)
+            DISTRIBUTED BY HASH(`id`) BUCKETS 10
+            PROPERTIES (
+            "colocate_with" = "t1"
+            );
+
+    2 Colocate Join 目前的限制:
+
+        1. Colcoate Table 必须是OLAP类型的表
+        2. 相同colocate_with 属性的表的 BUCKET 数必须一样
+        3. 相同colocate_with 属性的表的 副本数必须一样
+        4. 相同colocate_with 属性的表的 DISTRIBUTED Columns的数据类型必须一样
+
+    3 Colocate Join的适用场景:
+        
+        Colocate Join 十分适合几张表按照相同字段分桶,并高频根据相同字段Join的场景。 
+
+    4 FAQ:
+
+        Q: 支持多张表进行Colocate Join 吗? 
+   
+        A: 支持
+
+        Q: 支持Colocate 表和正常表 Join 吗?
+
+        A: 支持
+
+        Q: Colocate 表支持用非分桶的Key进行Join吗?
+
+        A: 支持:不符合Colocate Join条件的Join会使用Shuffle Join或Broadcast Join
+
+        Q: 如何确定Join 是按照Colocate Join 执行的?
+
+        A: explain的结果中Hash Join的孩子节点如果直接是OlapScanNode, 没有Exchange 
Node,就说明是Colocate Join
+
+        Q: 如何修改colocate_with 属性?
+
+        A: ALTER TABLE example_db.my_table set 
("colocate_with"="target_table");
+
+        Q: 如何禁用colcoate join?
+
+        A: set disable_colocate_join = true; 就可以禁用Colocate Join,查询时就会使用Shuffle 
Join 和Broadcast Join
+
+## keyword
+
+    COLOCATE, JOIN, CREATE TABLE
+
diff --git 
a/fe/src/main/java/org/apache/doris/alter/DecommissionBackendJob.java 
b/fe/src/main/java/org/apache/doris/alter/DecommissionBackendJob.java
index 6938879b..a9ee045f 100644
--- a/fe/src/main/java/org/apache/doris/alter/DecommissionBackendJob.java
+++ b/fe/src/main/java/org/apache/doris/alter/DecommissionBackendJob.java
@@ -248,6 +248,11 @@ public synchronized boolean sendTasks() {
                         }
                         OlapTable olapTable = (OlapTable) table;
 
+                        if (olapTable.getColocateTable() != null) {
+                            LOG.debug("{} is colocate table, skip", 
olapTable.getName());
+                            continue;
+                        }
+
                         long partitionId = tabletMeta.getPartitionId();
                         Partition partition = 
olapTable.getPartition(partitionId);
                         if (partition == null) {
@@ -431,6 +436,12 @@ public synchronized int tryFinishJob() {
                             continue;
                         }
 
+                        if (olapTable.getColocateTable() != null) {
+                            LOG.debug("{} is colocate table, 
ColocateTableBalancer will handle the tablet clone", olapTable.getName());
+                            finishedTabletIds.add(tabletId);
+                            continue;
+                        }
+
                         // get replication num
                         PartitionInfo partitionInfo = 
olapTable.getPartitionInfo();
                         short replicationNum = 
partitionInfo.getReplicationNum(partitionId);
diff --git a/fe/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java 
b/fe/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
index fc14cd68..1ef8741a 100644
--- a/fe/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
+++ b/fe/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
@@ -1257,6 +1257,12 @@ public void process(List<AlterClause> alterClauses, 
String clusterName, Database
                 } else {
                     throw new DdlException("reduplicated PROPERTIES");
                 }
+
+                if 
(properties.containsKey(PropertyAnalyzer.PROPERTIES_COLOCATE_WITH)) {
+                    String colocateTable = 
properties.get(PropertyAnalyzer.PROPERTIES_COLOCATE_WITH);
+                    Catalog.getInstance().modifyTableColocate(db, olapTable, 
colocateTable);
+                    return;
+                }
             }
 
             if (alterClause instanceof AddColumnClause) {
diff --git 
a/fe/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java 
b/fe/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java
index 7766211c..183251c6 100644
--- 
a/fe/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java
+++ 
b/fe/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java
@@ -19,6 +19,7 @@
 
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Config;
 import org.apache.doris.common.ErrorCode;
 import org.apache.doris.common.ErrorReport;
 import org.apache.doris.common.util.PrintableMap;
@@ -31,6 +32,7 @@
 public class ModifyTablePropertiesClause extends AlterClause {
 
     private static final String KEY_STORAGE_TYPE = "storage_type";
+    private static final String KEY_COLOCATE_WITH = "colocate_with";
 
     private Map<String, String> properties;
 
@@ -44,9 +46,19 @@ public void analyze(Analyzer analyzer) throws 
AnalysisException {
             throw new AnalysisException("Properties is not set");
         }
 
-        if 
(!Catalog.getCurrentCatalog().getAuth().checkGlobalPriv(ConnectContext.get(), 
PrivPredicate.ALTER)) {
+        if (properties.size() == 1 && 
properties.containsKey(KEY_COLOCATE_WITH)) {
+            if (Config.disable_colocate_join) {
+                throw new AnalysisException("Colocate table is disabled by 
Admin");
+            }
+
+            if (!Catalog.getCurrentCatalog().getAuth().checkDbPriv(
+                    ConnectContext.get(), ConnectContext.get().getDatabase(), 
PrivPredicate.ALTER)) {
+                
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR,
+                        "ALTER");
+            }
+        } else if 
(!Catalog.getCurrentCatalog().getAuth().checkGlobalPriv(ConnectContext.get(), 
PrivPredicate.ALTER)) {
             
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR,
-                                                "ALTER");
+                    "ALTER");
         }
 
         if (properties.containsKey(KEY_STORAGE_TYPE)) {
diff --git a/fe/src/main/java/org/apache/doris/catalog/Catalog.java 
b/fe/src/main/java/org/apache/doris/catalog/Catalog.java
index c354ea2a..4415e24f 100644
--- a/fe/src/main/java/org/apache/doris/catalog/Catalog.java
+++ b/fe/src/main/java/org/apache/doris/catalog/Catalog.java
@@ -81,6 +81,7 @@
 import org.apache.doris.catalog.Table.TableType;
 import org.apache.doris.clone.Clone;
 import org.apache.doris.clone.CloneChecker;
+import org.apache.doris.clone.ColocateTableBalancer;
 import org.apache.doris.cluster.BaseParam;
 import org.apache.doris.cluster.Cluster;
 import org.apache.doris.cluster.ClusterNamespace;
@@ -133,6 +134,7 @@
 import org.apache.doris.mysql.privilege.UserPropertyMgr;
 import org.apache.doris.persist.BackendIdsUpdateInfo;
 import org.apache.doris.persist.ClusterInfo;
+import org.apache.doris.persist.ColocatePersistInfo;
 import org.apache.doris.persist.DatabaseInfo;
 import org.apache.doris.persist.DropInfo;
 import org.apache.doris.persist.DropLinkDbAndUpdateDbInfo;
@@ -146,6 +148,7 @@
 import org.apache.doris.persist.StorageInfo;
 import org.apache.doris.persist.TableInfo;
 import org.apache.doris.persist.TruncateTableInfo;
+import org.apache.doris.persist.TablePropertyInfo;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.JournalObservable;
 import org.apache.doris.qe.SessionVariable;
@@ -310,6 +313,7 @@
 
     private SystemInfoService systemInfo;
     private TabletInvertedIndex tabletInvertedIndex;
+    private ColocateTableIndex colocateTableIndex;
 
     private CatalogRecycleBin recycleBin;
     private FunctionSet functionSet;
@@ -361,6 +365,10 @@ public TabletInvertedIndex getTabletInvertedIndex() {
         return this.tabletInvertedIndex;
     }
 
+    public ColocateTableIndex getColocateTableIndex() {
+        return this.colocateTableIndex;
+    }
+
     private CatalogRecycleBin getRecycleBin() {
         return this.recycleBin;
     }
@@ -408,6 +416,7 @@ private Catalog() {
 
         this.systemInfo = new SystemInfoService();
         this.tabletInvertedIndex = new TabletInvertedIndex();
+        this.colocateTableIndex = new ColocateTableIndex();
         this.recycleBin = new CatalogRecycleBin();
         this.functionSet = new FunctionSet();
         this.functionSet.init();
@@ -488,6 +497,11 @@ public static TabletInvertedIndex 
getCurrentInvertedIndex() {
         return getCurrentCatalog().getTabletInvertedIndex();
     }
 
+    // use this to get correct ColocateTableIndex instance
+    public static ColocateTableIndex getCurrentColocateIndex() {
+        return getCurrentCatalog().getColocateTableIndex();
+    }
+
     public static CatalogRecycleBin getCurrentRecycleBin() {
         return getCurrentCatalog().getRecycleBin();
     }
@@ -1012,6 +1026,12 @@ private void transferToMaster() throws IOException {
         
CloneChecker.getInstance().setInterval(Config.clone_checker_interval_second * 
1000L);
         CloneChecker.getInstance().start();
 
+        // Colocate tables balancer
+        if (!Config.disable_colocate_join) {
+            ColocateTableBalancer.getInstance().setInterval(60 * 1000L);
+            ColocateTableBalancer.getInstance().start();
+        }
+
         // Publish Version Daemon
         publishVersionDaemon.start();
 
@@ -1233,6 +1253,7 @@ public void loadImage(String imageDir) throws 
IOException, DdlException {
             if (Catalog.getCurrentCatalogJournalVersion() >= 
FeMetaVersion.VERSION_45) {
                 checksum = loadTransactionState(dis, checksum);
             }
+            checksum = loadColocateTableIndex(dis, checksum);
 
             long remoteChecksum = dis.readLong();
             Preconditions.checkState(remoteChecksum == checksum, 
remoteChecksum + " vs. " + checksum);
@@ -1628,6 +1649,13 @@ public long loadRecycleBin(DataInputStream dis, long 
checksum) throws IOExceptio
         return checksum;
     }
 
+    public long loadColocateTableIndex(DataInputStream dis, long checksum) 
throws IOException {
+        if (Catalog.getCurrentCatalogJournalVersion() >= 
FeMetaVersion.VERSION_46) {
+            Catalog.getCurrentColocateIndex().readFields(dis);
+        }
+        return checksum;
+    }
+
     // Only called by checkpoint thread
     public void saveImage() throws IOException {
         // Write image.ckpt
@@ -1671,6 +1699,7 @@ public void saveImage(File curFile, long 
replayedJournalId) throws IOException {
             checksum = saveBackupHandler(dos, checksum);
             checksum = savePaloAuth(dos, checksum);
             checksum = saveTransactionState(dos, checksum);
+            checksum = saveColocateTableIndex(dos, checksum);
             dos.writeLong(checksum);
         } finally {
             dos.close();
@@ -1897,6 +1926,11 @@ public long saveRecycleBin(DataOutputStream dos, long 
checksum) throws IOExcepti
         return checksum;
     }
 
+    public long saveColocateTableIndex(DataOutputStream dos, long checksum) 
throws IOException {
+        Catalog.getCurrentColocateIndex().write(dos);
+        return checksum;
+    }
+
     // global variable persistence
     public long loadGlobalVariable(DataInputStream in, long checksum) throws 
IOException, DdlException {
         if (Catalog.getCurrentCatalogJournalVersion() >= 
FeMetaVersion.VERSION_22) {
@@ -2750,6 +2784,11 @@ public void addPartition(Database db, String tableName, 
AddPartitionClause addPa
                 distributionInfo = defaultDistributionInfo;
             }
 
+            if (olapTable.getColocateTable() != null) {
+                ColocateTableUtils.checkReplicationNum(rangePartitionInfo, 
singlePartitionDesc.getReplicationNum());
+                
ColocateTableUtils.checkBucketNum(olapTable.getDefaultDistributionInfo(), 
distributionInfo );
+            }
+
             indexIdToShortKeyColumnCount = 
olapTable.getCopiedIndexIdToShortKeyColumnCount();
             indexIdToSchemaHash = olapTable.getCopiedIndexIdToSchemaHash();
             indexIdToStorageType = olapTable.getCopiedIndexIdToStorageType();
@@ -3018,6 +3057,8 @@ public void modifyPartition(Database db, OlapTable 
olapTable, ModifyPartitionCla
 
         if (newReplicationNum == oldReplicationNum) {
             newReplicationNum = (short) -1;
+        } else if (olapTable.getColocateTable() != null) {
+            
ErrorReport.reportDdlException(ErrorCode.ERR_COLOCATE_TABLE_MUST_SAME_REPLICAT_NUM,
 oldReplicationNum);
         }
 
         // check if has other undefined properties
@@ -3270,6 +3311,37 @@ private Table createOlapTable(Database db, 
CreateTableStmt stmt, boolean isResto
             throw new DdlException(e.getMessage());
         }
 
+        // colocateTable
+        try {
+            String colocateTable = 
PropertyAnalyzer.analyzeColocate(properties);
+            if (colocateTable != null) {
+                if (Config.disable_colocate_join) {
+                    
ErrorReport.reportDdlException(ErrorCode.ERR_COLOCATE_TABLE_DISABLED);
+                }
+
+                Table parentTable = ColocateTableUtils.getColocateTable(db, 
colocateTable);
+                //for colocate child table
+                if (!colocateTable.equalsIgnoreCase(tableName)) {
+                    ColocateTableUtils.checkTableExist(parentTable, 
colocateTable);
+
+                    ColocateTableUtils.checkTableType(parentTable);
+
+                    ColocateTableUtils.checkBucketNum((OlapTable) parentTable, 
distributionInfo);
+
+                    
ColocateTableUtils.checkDistributionColumnSizeAndType((OlapTable) parentTable, 
distributionInfo);
+
+                    getColocateTableIndex().addTableToGroup(db.getId(), 
tableId, parentTable.getId());
+                } else {
+                    getColocateTableIndex().addTableToGroup(db.getId(), 
tableId, tableId);
+                }
+
+                olapTable.setColocateTable(colocateTable);
+
+            }
+        } catch (AnalysisException e) {
+            throw new DdlException(e.getMessage());
+        }
+
         // set index schema
         int schemaVersion = 0;
         try {
@@ -3318,6 +3390,10 @@ private Table createOlapTable(Database db, 
CreateTableStmt stmt, boolean isResto
                 short replicationNum = FeConstants.default_replication_num;
                 try {
                     replicationNum = 
PropertyAnalyzer.analyzeReplicationNum(properties, replicationNum);
+                    if (olapTable.getColocateTable() != null && 
!olapTable.getColocateTable().equalsIgnoreCase(tableName)) {
+                        Table parentTable = 
ColocateTableUtils.getColocateTable(db, olapTable.getColocateTable());
+                        
ColocateTableUtils.checkReplicationNum(((OlapTable)parentTable).getPartitionInfo(),
 replicationNum);
+                    }
                 } catch (AnalysisException e) {
                     throw new DdlException(e.getMessage());
                 }
@@ -3344,6 +3420,10 @@ private Table createOlapTable(Database db, 
CreateTableStmt stmt, boolean isResto
                     // and then check if there still has unknown properties
                     PropertyAnalyzer.analyzeDataProperty(stmt.getProperties(), 
DataProperty.DEFAULT_HDD_DATA_PROPERTY);
                     PropertyAnalyzer.analyzeReplicationNum(properties, 
FeConstants.default_replication_num);
+                    if (olapTable.getColocateTable() != null && 
!olapTable.getColocateTable().equalsIgnoreCase(tableName)) {
+                        Table parentTable = 
ColocateTableUtils.getColocateTable(db, olapTable.getColocateTable());
+                        
ColocateTableUtils.checkReplicationNum((OlapTable)parentTable, partitionInfo);
+                    }
 
                     if (properties != null && !properties.isEmpty()) {
                         // here, all properties should be checked
@@ -3385,12 +3465,36 @@ private Table createOlapTable(Database db, 
CreateTableStmt stmt, boolean isResto
                 if (!db.createTableWithLock(olapTable, false, 
stmt.isSetIfNotExists())) {
                     
ErrorReport.reportDdlException(ErrorCode.ERR_CANT_CREATE_TABLE, tableName, 
"table already exists");
                 }
+
+                //we have added these index to memory, only need to persist 
here
+                if (olapTable.getColocateTable() != null) {
+                    Long groupId = ColocateTableUtils.getColocateTable(db, 
olapTable.getColocateTable()).getId();
+                    ColocatePersistInfo info;
+                    if 
(getColocateTableIndex().isColocateParentTable(tableId)) {
+                        List<List<Long>> backendsPerBucketSeq = 
getColocateTableIndex().getBackendsPerBucketSeq(groupId);
+                        info = ColocatePersistInfo.CreateForAddTable(tableId, 
groupId, db.getId(), backendsPerBucketSeq);
+                    } else {
+                        info = ColocatePersistInfo.CreateForAddTable(tableId, 
groupId, db.getId(), new ArrayList<>());
+                    }
+                    editLog.logColocateAddTable(info);
+                }
+
                 LOG.info("successfully create table[{};{}]", tableName, 
tableId);
             }
         } catch (DdlException e) {
             for (Long tabletId : tabletIdSet) {
                 Catalog.getCurrentInvertedIndex().deleteTablet(tabletId);
             }
+
+            //only remove from memory, because we have not persist it
+            if (olapTable.getColocateTable() != null) {
+                getColocateTableIndex().removeTable(tableId);
+
+                if (getColocateTableIndex().isColocateParentTable(tableId)) {
+                    
getColocateTableIndex().removeBackendsPerBucketSeq(tableId);
+                }
+            }
+
             throw e;
         }
 
@@ -3654,6 +3758,13 @@ public static void getDdlStmt(Table table, List<String> 
createTableStmt, List<St
                 sb.append(replicationNum).append("\"");
             }
 
+            // 5. colocateTable
+            String colocateTable = olapTable.getColocateTable();
+            if (colocateTable != null) {
+                sb.append(",\n 
\"").append(PropertyAnalyzer.PROPERTIES_COLOCATE_WITH).append("\" = \"");
+                sb.append(colocateTable).append("\"");
+            }
+
             sb.append("\n);");
         } else if (table.getType() == TableType.MYSQL) {
             MysqlTable mysqlTable = (MysqlTable) table;
@@ -3837,6 +3948,21 @@ private void createTablets(String clusterName, 
MaterializedIndex index, ReplicaS
 
         DistributionInfoType distributionInfoType = distributionInfo.getType();
         if (distributionInfoType == DistributionInfoType.RANDOM || 
distributionInfoType == DistributionInfoType.HASH) {
+
+            ColocateTableIndex colocateIndex = 
Catalog.getCurrentColocateIndex();
+            List<List<Long>> backendsPerBucketSeq = new ArrayList<>();
+            if (colocateIndex.isColocateTable(tabletMeta.getTableId())) {
+                Database db = 
Catalog.getInstance().getDb(tabletMeta.getDbId());
+                long groupId = colocateIndex.getGroup(tabletMeta.getTableId());
+                //Use db write lock here to make sure the backendsPerBucketSeq 
is consistent when the backendsPerBucketSeq is updating.
+                //This lock will release very fast.
+                db.writeLock();
+                try {
+                    backendsPerBucketSeq = 
colocateIndex.getBackendsPerBucketSeq(groupId);
+                } finally {
+                    db.writeUnlock();
+                }
+            }
             for (int i = 0; i < distributionInfo.getBucketNum(); ++i) {
                 // create a new tablet with random chosen backends
                 Tablet tablet = new Tablet(getNextId());
@@ -3845,13 +3971,32 @@ private void createTablets(String clusterName, 
MaterializedIndex index, ReplicaS
                 index.addTablet(tablet, tabletMeta);
                 tabletIdSet.add(tablet.getId());
 
-                // create replicas for tablet with random chosen backends
-                List<Long> chosenBackendIds = 
Catalog.getCurrentSystemInfo().seqChooseBackendIds(replicationNum, true,
-                        true, clusterName);
-                if (chosenBackendIds == null) {
-                    throw new DdlException("Failed to find " + replicationNum 
+ " different hosts to create table");
+                //get BackendIds
+                List<Long> chosenBackendIds;
+
+                //for colocate parent table
+                if 
(colocateIndex.isColocateParentTable(tabletMeta.getTableId())) {
+                    if (backendsPerBucketSeq.size() == 
distributionInfo.getBucketNum()) {
+                        //for not first partitions of colocate parent table
+                        chosenBackendIds = backendsPerBucketSeq.get(i);
+                    } else {
+                        //for the first partitions of colocate parent table
+                        chosenBackendIds = 
chosenBackendIdBySeq(replicationNum, clusterName);
+                        backendsPerBucketSeq.add(chosenBackendIds);
+
+                        if (i == distributionInfo.getBucketNum() - 1) {
+                            //delay persist this until we ensure the table 
create successfully
+                            
colocateIndex.addBackendsPerBucketSeq(tabletMeta.getTableId(), 
backendsPerBucketSeq);
+                        }
+                    }
+                } else if 
(colocateIndex.isColocateTable(tabletMeta.getTableId())) {
+                    //for colocate child table
+                    chosenBackendIds = backendsPerBucketSeq.get(i);
+                } else {
+                    //for normal table
+                    chosenBackendIds = chosenBackendIdBySeq(replicationNum, 
clusterName);
                 }
-                Preconditions.checkState(chosenBackendIds.size() == 
replicationNum);
+
                 for (long backendId : chosenBackendIds) {
                     long replicaId = getNextId();
                     Replica replica = new Replica(replicaId, backendId, 
replicaState, version, versionHash);
@@ -3863,6 +4008,16 @@ private void createTablets(String clusterName, 
MaterializedIndex index, ReplicaS
         }
     }
 
+    // create replicas for tablet with random chosen backends
+    private List<Long> chosenBackendIdBySeq(int replicationNum, String 
clusterName) throws DdlException {
+        List<Long> chosenBackendIds = 
Catalog.getCurrentSystemInfo().seqChooseBackendIds(replicationNum, true, true, 
clusterName);
+        if (chosenBackendIds == null) {
+            throw new DdlException("Failed to find enough host in all 
backends. need: " + replicationNum);
+        }
+        Preconditions.checkState(chosenBackendIds.size() == replicationNum);
+        return chosenBackendIds;
+    }
+
     // Drop table
     public void dropTable(DropTableStmt stmt) throws DdlException {
         String dbName = stmt.getDbName();
@@ -3902,6 +4057,10 @@ public void dropTable(DropTableStmt stmt) throws 
DdlException {
 
             DropInfo info = new DropInfo(db.getId(), table.getId(), -1L);
             editLog.logDropTable(info);
+
+            Catalog.getCurrentColocateIndex().removeTable(table.getId());
+            ColocatePersistInfo colocateInfo = 
ColocatePersistInfo.CreateForRemoveTable(table.getId());
+            editLog.logColocateRemoveTable(colocateInfo);
         } finally {
             db.writeUnlock();
         }
@@ -4551,6 +4710,35 @@ public void replayRenameTable(TableInfo tableInfo) 
throws DdlException {
         }
     }
 
+    //the invoker should keep db write lock
+    public void modifyTableColocate(Database db, OlapTable table, String 
colocateTable) throws DdlException {
+        Table parentTable = db.getTable(colocateTable);
+        if (parentTable != null && 
getColocateTableIndex().isColocateParentTable(parentTable.getId())) {
+            table.setColocateTable(colocateTable);
+            Map<String, String> properties = 
Maps.newHashMapWithExpectedSize(1);
+            properties.put(PropertyAnalyzer.PROPERTIES_COLOCATE_WITH, 
colocateTable);
+            TablePropertyInfo info = new TablePropertyInfo(db.getId(), 
table.getId(), properties);
+            editLog.logModifyTableColocate(info);
+        } else {
+            
ErrorReport.reportDdlException(ErrorCode.ERR_COLOCATE_TABLE_NO_EXIT, 
colocateTable);
+        }
+    }
+
+    public void replayModifyTableColocate(TablePropertyInfo info) {
+        long dbId = info.getDbId();
+        long tableId = info.getTableId();
+        Map<String, String> properties = info.getPropertyMap();
+
+        Database db = getDb(dbId);
+        db.writeLock();
+        try {
+            OlapTable table = (OlapTable) db.getTable(tableId);
+            
table.setColocateTable(properties.get((PropertyAnalyzer.PROPERTIES_COLOCATE_WITH)));
+        } finally {
+            db.writeUnlock();
+        }
+    }
+
     public void renameRollup(Database db, OlapTable table, RollupRenameClause 
renameClause) throws DdlException {
         if (table.getState() != OlapTableState.NORMAL) {
             throw new DdlException("Table[" + table.getName() + "] is under " 
+ table.getState());
diff --git a/fe/src/main/java/org/apache/doris/catalog/ColocateTableIndex.java 
b/fe/src/main/java/org/apache/doris/catalog/ColocateTableIndex.java
new file mode 100644
index 00000000..c751f289
--- /dev/null
+++ b/fe/src/main/java/org/apache/doris/catalog/ColocateTableIndex.java
@@ -0,0 +1,366 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.catalog;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+import org.apache.doris.common.io.Writable;
+import org.apache.doris.persist.ColocatePersistInfo;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * maintain the colocate table related indexes and meta
+ */
+public class ColocateTableIndex implements Writable {
+    private ReentrantReadWriteLock lock;
+
+    //group_id -> table_ids
+    private Multimap<Long, Long> group2Tables;
+    //table_id -> group_id
+    private Map<Long, Long> table2Groups;
+    //group_id -> db_id
+    private Map<Long, Long> group2DBs;
+    //group_id -> bucketSeq -> backends
+    private Map<Long, List<List<Long>>> group2BackendsPerBucketSeq;
+    //the colocate group is balancing
+    private Set<Long> balancingGroups;
+
+    public ColocateTableIndex() {
+        lock = new ReentrantReadWriteLock();
+
+        group2Tables = ArrayListMultimap.create();
+        table2Groups = Maps.newHashMap();
+        group2DBs = Maps.newHashMap();
+        group2BackendsPerBucketSeq = Maps.newHashMap();
+        balancingGroups = new CopyOnWriteArraySet<Long>();
+    }
+
+    private final void readLock() {
+        this.lock.readLock().lock();
+    }
+
+    private final void readUnlock() {
+        this.lock.readLock().unlock();
+    }
+
+    private final void writeLock() {
+        this.lock.writeLock().lock();
+    }
+
+    private final void writeUnlock() {
+        this.lock.writeLock().unlock();
+    }
+
+    public void addTableToGroup(long dbId, long tableId, long groupId) {
+        writeLock();
+        try {
+            group2Tables.put(groupId, tableId);
+            group2DBs.put(groupId, dbId);
+            table2Groups.put(tableId, groupId);
+        } finally {
+            writeUnlock();
+        }
+    }
+
+    public void addBackendsPerBucketSeq(long groupId, List<List<Long>> 
backendsPerBucketSeq) {
+        writeLock();
+        try {
+            group2BackendsPerBucketSeq.put(groupId, backendsPerBucketSeq);
+        } finally {
+            writeUnlock();
+        }
+    }
+
+    public void markGroupBalancing(long groupId) {
+        balancingGroups.add(groupId);
+    }
+
+    public void markGroupStable(long groupId) {
+        balancingGroups.remove(groupId);
+    }
+
+    public void removeTable(long tableId) {
+        long groupId;
+        readLock();
+        try {
+            if (!table2Groups.containsKey(tableId)) {
+                return;
+            }
+            groupId = table2Groups.get(tableId);
+        } finally {
+            readUnlock();
+        }
+
+        removeTableFromGroup(tableId, groupId);
+    }
+
+    private void removeTableFromGroup(long tableId, long groupId) {
+        writeLock();
+        try {
+
+            if (groupId == tableId) {
+                //for parent table
+                group2Tables.removeAll(groupId);
+                group2BackendsPerBucketSeq.remove(groupId);
+                group2DBs.remove(groupId);
+                balancingGroups.remove(groupId);
+            } else {
+                //for child table
+                group2Tables.remove(groupId, tableId);
+            }
+            table2Groups.remove(tableId);
+        } finally {
+            writeUnlock();
+        }
+    }
+
+    public void removeBackendsPerBucketSeq(long groupId) {
+        writeLock();
+        try {
+            group2BackendsPerBucketSeq.remove(groupId);
+        } finally {
+            writeUnlock();
+        }
+    }
+
+    public boolean isGroupBalancing(long groupId) {
+        return balancingGroups.contains(groupId);
+    }
+
+    public boolean isColocateParentTable(long tableId) {
+        readLock();
+        try {
+            return group2Tables.containsKey(tableId);
+        } finally {
+            readUnlock();
+        }
+
+    }
+
+    public boolean isColocateTable(long tableId) {
+        readLock();
+        try {
+            return table2Groups.containsKey(tableId);
+        } finally {
+            readUnlock();
+        }
+    }
+
+    public long getGroup(long tableId) {
+        readLock();
+        try {
+            Preconditions.checkState(table2Groups.containsKey(tableId));
+            return table2Groups.get(tableId);
+        } finally {
+            readUnlock();
+        }
+    }
+
+    public Set<Long> getBalancingGroupIds() {
+        return balancingGroups;
+    }
+
+    public Set<Long> getAllGroupIds() {
+        readLock();
+        try {
+            return group2Tables.keySet();
+        } finally {
+            readUnlock();
+        }
+    }
+
+    public Set<Long> getBackendsByGroup(long groupId) {
+        readLock();
+        try {
+            Set<Long> allBackends = new HashSet<>();
+            List<List<Long>> BackendsPerBucketSeq = 
group2BackendsPerBucketSeq.get(groupId);
+            for (List<Long> bes : BackendsPerBucketSeq) {
+                allBackends.addAll(bes);
+            }
+            return allBackends;
+        } finally {
+            readUnlock();
+        }
+    }
+
+    public Long getDB(long group) {
+        readLock();
+        try {
+            Preconditions.checkState(group2DBs.containsKey(group));
+            return group2DBs.get(group);
+        } finally {
+            readUnlock();
+        }
+    }
+
+    public List<Long> getAllTableIds(long groupId) {
+        readLock();
+        try {
+            Preconditions.checkState(group2Tables.containsKey(groupId));
+            return new ArrayList(group2Tables.get(groupId));
+        } finally {
+            readUnlock();
+        }
+    }
+
+    public List<List<Long>> getBackendsPerBucketSeq(long groupId) {
+        readLock();
+        try {
+            List<List<Long>> backendsPerBucketSeq = 
group2BackendsPerBucketSeq.get(groupId);
+            if (backendsPerBucketSeq == null) {
+                return new ArrayList<List<Long>>();
+            }
+            return backendsPerBucketSeq;
+        } finally {
+            readUnlock();
+        }
+    }
+
+    public void replayAddTableToGroup(ColocatePersistInfo info) {
+        addTableToGroup(info.getDbId(), info.getTableId(), info.getGroupId());
+        //for parent table
+        if (info.getBackendsPerBucketSeq().size() > 0) {
+            addBackendsPerBucketSeq(info.getGroupId(), 
info.getBackendsPerBucketSeq());
+        }
+    }
+
+    public void replayAddBackendsPerBucketSeq(ColocatePersistInfo info) {
+        addBackendsPerBucketSeq(info.getGroupId(), 
info.getBackendsPerBucketSeq());
+    }
+
+    public void replayMarkGroupBalancing(ColocatePersistInfo info) {
+        markGroupBalancing(info.getGroupId());
+    }
+
+    public void replayMarkGroupStable(ColocatePersistInfo info) {
+        markGroupStable(info.getGroupId());
+    }
+
+    public void replayRemoveTable(ColocatePersistInfo info) {
+        removeTable(info.getTableId());
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        int size = group2Tables.asMap().size();
+        out.writeInt(size);
+        for (Map.Entry<Long, Collection<Long>> entry : 
group2Tables.asMap().entrySet()) {
+            out.writeLong(entry.getKey());
+            out.writeInt(entry.getValue().size());
+            for (Long tableId : entry.getValue()) {
+                out.writeLong(tableId);
+            }
+        }
+
+        size = table2Groups.size();
+        out.writeInt(size);
+        for (Map.Entry<Long, Long> entry : table2Groups.entrySet()) {
+            out.writeLong(entry.getKey());
+            out.writeLong(entry.getValue());
+        }
+
+        size = group2DBs.size();
+        out.writeInt(size);
+        for (Map.Entry<Long, Long> entry : group2DBs.entrySet()) {
+            out.writeLong(entry.getKey());
+            out.writeLong(entry.getValue());
+        }
+
+        size = group2BackendsPerBucketSeq.size();
+        out.writeInt(size);
+        for (Map.Entry<Long, List<List<Long>>> entry : 
group2BackendsPerBucketSeq.entrySet()) {
+            out.writeLong(entry.getKey());
+            out.writeInt(entry.getValue().size());
+            for (List<Long> bucket2BEs : entry.getValue()) {
+                out.writeInt(bucket2BEs.size());
+                for (Long be : bucket2BEs) {
+                    out.writeLong(be);
+                }
+            }
+        }
+
+        size = balancingGroups.size();
+        out.writeInt(size);
+        for (Long group : balancingGroups) {
+            out.writeLong(group);
+        }
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        int size = in.readInt();
+        for (int i = 0; i < size; i++) {
+            long group = in.readLong();
+            int tableSize = in.readInt();
+            List<Long> tables = new ArrayList<>();
+            for (int j = 0; j < tableSize; j++) {
+                tables.add(in.readLong());
+            }
+            group2Tables.putAll(group, tables);
+        }
+
+        size = in.readInt();
+        for (int i = 0; i < size; i++) {
+            long table = in.readLong();
+            long group = in.readLong();
+            table2Groups.put(table, group);
+        }
+
+        size = in.readInt();
+        for (int i = 0; i < size; i++) {
+            long group = in.readLong();
+            long db = in.readLong();
+            group2DBs.put(group, db);
+        }
+
+        size = in.readInt();
+        for (int i = 0; i < size; i++) {
+            long group = in.readLong();
+            List<List<Long>> bucketBeLists = new ArrayList<>();
+            int bucketBeListsSize = in.readInt();
+            for (int j = 0; j < bucketBeListsSize; j++) {
+                int beListSize = in.readInt();
+                List<Long> beLists = new ArrayList<>();
+                for (int k = 0; k < beListSize; k++) {
+                    beLists.add(in.readLong());
+                }
+                bucketBeLists.add(beLists);
+            }
+            group2BackendsPerBucketSeq.put(group, bucketBeLists);
+        }
+
+        size = in.readInt();
+        for (int i = 0; i < size; i++) {
+            long group = in.readLong();
+            balancingGroups.add(group);
+        }
+    }
+}
diff --git a/fe/src/main/java/org/apache/doris/catalog/ColocateTableUtils.java 
b/fe/src/main/java/org/apache/doris/catalog/ColocateTableUtils.java
new file mode 100644
index 00000000..68318613
--- /dev/null
+++ b/fe/src/main/java/org/apache/doris/catalog/ColocateTableUtils.java
@@ -0,0 +1,104 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.catalog;
+
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.ErrorReport;
+
+import java.util.List;
+
+public class ColocateTableUtils {
+
+    static Table getColocateTable(Database db, String tableName) {
+        Table parentTable;
+        db.readLock();
+        try {
+            parentTable = db.getTable(tableName);
+        } finally {
+            db.readUnlock();
+        }
+        return parentTable;
+    }
+
+    static void checkTableExist(Table colocateTable, String colocateTableName) 
throws DdlException {
+        if (colocateTable == null) {
+            
ErrorReport.reportDdlException(ErrorCode.ERR_COLOCATE_TABLE_NO_EXIT, 
colocateTableName);
+        }
+    }
+
+    static void checkTableType(Table colocateTable) throws DdlException {
+        if (colocateTable.type != (Table.TableType.OLAP)) {
+            
ErrorReport.reportDdlException(ErrorCode.ERR_COLOCATE_TABLE_MUST_OLAP_TABLE, 
colocateTable.getName());
+        }
+    }
+
+    static void checkBucketNum(OlapTable parentTable, DistributionInfo 
childDistributionInfo) throws DdlException {
+        int parentBucketNum = 
parentTable.getDefaultDistributionInfo().getBucketNum();
+        if (parentBucketNum != childDistributionInfo.getBucketNum()) {
+            
ErrorReport.reportDdlException(ErrorCode.ERR_COLOCATE_TABLE_MUST_SAME_BUCKNUM, 
parentBucketNum);
+        }
+    }
+
+    static void checkBucketNum(DistributionInfo oldDistributionInfo, 
DistributionInfo newDistributionInfo)
+            throws DdlException {
+        int oldBucketNum = oldDistributionInfo.getBucketNum();
+        if (oldBucketNum != newDistributionInfo.getBucketNum()) {
+            
ErrorReport.reportDdlException(ErrorCode.ERR_COLOCATE_TABLE_MUST_SAME_BUCKNUM, 
oldBucketNum);
+        }
+    }
+
+    static void checkReplicationNum(OlapTable parentTable, PartitionInfo 
childPartitionInfo) throws DdlException {
+        short childReplicationNum = 
childPartitionInfo.idToReplicationNum.entrySet().iterator().next().getValue();
+        checkReplicationNum(parentTable.getPartitionInfo(), 
childReplicationNum);
+    }
+
+    static void checkReplicationNum(PartitionInfo rangePartitionInfo, short 
childReplicationNum) throws DdlException {
+        short oldReplicationNum = 
rangePartitionInfo.idToReplicationNum.entrySet().iterator().next().getValue();
+        checkReplicationNum(oldReplicationNum, childReplicationNum);
+    }
+
+    private static void checkReplicationNum(short oldReplicationNum, short 
newReplicationNum) throws DdlException {
+        if (oldReplicationNum != newReplicationNum) {
+            
ErrorReport.reportDdlException(ErrorCode.ERR_COLOCATE_TABLE_MUST_SAME_REPLICAT_NUM,
 oldReplicationNum);
+        }
+    }
+
+    static void checkDistributionColumnSizeAndType(OlapTable parentTable, 
DistributionInfo childDistributionInfo)
+            throws DdlException {
+        HashDistributionInfo parentDistribution = (HashDistributionInfo) 
(parentTable).getDefaultDistributionInfo();
+        List<Column> parentColumns = 
parentDistribution.getDistributionColumns();
+        List<Column> childColumns = ((HashDistributionInfo) 
childDistributionInfo).getDistributionColumns();
+
+        int parentColumnSize = parentColumns.size();
+        if (parentColumnSize != childColumns.size()) {
+            
ErrorReport.reportDdlException(ErrorCode.ERR_COLOCATE_TABLE_SAME_DISTRIBUTED_COLUMNS_SIZE,
+                    parentColumnSize);
+        }
+
+        for (int i = 0; i < parentColumnSize; i++) {
+            String parentColumnName = parentColumns.get(i).getName();
+            ColumnType parentColumnType = parentColumns.get(i).getColumnType();
+            if (!parentColumnType.equals(childColumns.get(i).getColumnType())) 
{
+                
ErrorReport.reportDdlException(ErrorCode.ERR_COLOCATE_TABLE_SAME_DISTRIBUTED_COLUMNS_TYPE,
+                        parentColumnName, parentColumnType);
+            }
+        }
+    }
+
+}
diff --git a/fe/src/main/java/org/apache/doris/catalog/OlapTable.java 
b/fe/src/main/java/org/apache/doris/catalog/OlapTable.java
index b54e0383..254ffac9 100644
--- a/fe/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -105,6 +105,8 @@
     private Set<String> bfColumns;
     private double bfFpp;
 
+    private String colocateTable;
+
     public OlapTable() {
         // for persist
         super(TableType.OLAP);
@@ -122,6 +124,8 @@ public OlapTable() {
 
         this.bfColumns = null;
         this.bfFpp = 0;
+
+        this.colocateTable = null;
     }
 
     public OlapTable(long id, String tableName, List<Column> baseSchema,
@@ -148,6 +152,8 @@ public OlapTable(long id, String tableName, List<Column> 
baseSchema,
 
         this.bfColumns = null;
         this.bfFpp = 0;
+
+        this.colocateTable = null;
     }
 
     public void setState(OlapTableState state) {
@@ -519,6 +525,14 @@ public void setBloomFilterInfo(Set<String> bfColumns, 
double bfFpp) {
         this.bfColumns = bfColumns;
         this.bfFpp = bfFpp;
     }
+
+    public String getColocateTable() {
+        return colocateTable;
+    }
+
+    public void setColocateTable(String colocateTable) {
+        this.colocateTable = colocateTable;
+    }
     
     // when the table is creating new rollup and enter finishing state, should 
tell be not auto load to new rollup
     // it is used for stream load
@@ -765,6 +779,14 @@ public void write(DataOutput out) throws IOException {
             }
             out.writeDouble(bfFpp);
         }
+
+        //colocateTable
+        if (colocateTable == null) {
+            out.writeBoolean(false);
+        } else {
+            out.writeBoolean(true);
+            Text.writeString(out, colocateTable);
+        }
     }
 
     @Override
@@ -846,6 +868,12 @@ public void readFields(DataInput in) throws IOException {
                 bfFpp = in.readDouble();
             }
         }
+
+        if (Catalog.getCurrentCatalogJournalVersion() >= 
FeMetaVersion.VERSION_46) {
+            if (in.readBoolean()) {
+                colocateTable = Text.readString(in);
+            }
+        }
     }
 
     public boolean equals(Table table) {
diff --git a/fe/src/main/java/org/apache/doris/clone/BackendInfo.java 
b/fe/src/main/java/org/apache/doris/clone/BackendInfo.java
new file mode 100644
index 00000000..060b1406
--- /dev/null
+++ b/fe/src/main/java/org/apache/doris/clone/BackendInfo.java
@@ -0,0 +1,96 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.clone;
+
+public class BackendInfo {
+    private long backendId;
+    private String host;
+
+    private long totalCapacityB;
+    private long availableCapacityB;
+    // capacity for clone
+    private long cloneCapacityB;
+
+    private int tableReplicaNum;
+    // replica num for clone
+    private int cloneReplicaNum;
+
+    public BackendInfo(String host, long backendId, long totalCapacityB, long 
availableCapacityB) {
+        this.backendId = backendId;
+        this.totalCapacityB = totalCapacityB;
+        this.availableCapacityB = availableCapacityB;
+        this.host = host;
+        this.cloneCapacityB = 0L;
+        this.tableReplicaNum = 0;
+        this.cloneReplicaNum = 0;
+    }
+
+    public String getHost() {
+        return host;
+    }
+
+    public long getBackendId() {
+        return backendId;
+    }
+
+    public long getTotalCapacityB() {
+        return totalCapacityB;
+    }
+
+    public long getAvailableCapacityB() {
+        return availableCapacityB;
+    }
+
+    public void setCloneCapacityB(long cloneCapacityB) {
+        this.cloneCapacityB = cloneCapacityB;
+    }
+
+    public boolean canCloneByCapacity(long tabletSizeB) {
+        if (cloneCapacityB <= tabletSizeB) {
+            return false;
+        }
+        return true;
+    }
+
+    public void decreaseCloneCapacityB(long tabletSizeB) {
+        cloneCapacityB -= tabletSizeB;
+    }
+
+    public int getTableReplicaNum() {
+        return tableReplicaNum;
+    }
+
+    public void setTableReplicaNum(int tableReplicaNum) {
+        this.tableReplicaNum = tableReplicaNum;
+    }
+
+    public void setCloneReplicaNum(int cloneReplicaNum) {
+        this.cloneReplicaNum = cloneReplicaNum;
+    }
+
+    public boolean canCloneByDistribution() {
+        if (cloneReplicaNum <= 1) {
+            return false;
+        }
+        return true;
+    }
+
+    public void decreaseCloneReplicaNum() {
+        cloneReplicaNum -= 1;
+    }
+}
diff --git a/fe/src/main/java/org/apache/doris/clone/Clone.java 
b/fe/src/main/java/org/apache/doris/clone/Clone.java
index 8b43fc04..2fc86e02 100644
--- a/fe/src/main/java/org/apache/doris/clone/Clone.java
+++ b/fe/src/main/java/org/apache/doris/clone/Clone.java
@@ -120,8 +120,8 @@ public boolean addCloneJob(long dbId, long tableId, long 
partitionId, long index
             }
 
             // check job num
-            // TODO(cmy): for now we limit clone job num in all priority level.
-            if (jobNum >= Config.clone_max_job_num) {
+            // TODO(cmy): for now we limit clone job num for NORMAL and LOW 
Priority clone job
+            if (priority != JobPriority.HIGH && jobNum >= 
Config.clone_max_job_num) {
                 LOG.debug("too many clone jobs. job num: {}", jobNum);
                 return false;
             }
diff --git a/fe/src/main/java/org/apache/doris/clone/CloneChecker.java 
b/fe/src/main/java/org/apache/doris/clone/CloneChecker.java
index 58a807d0..de9e9a51 100644
--- a/fe/src/main/java/org/apache/doris/clone/CloneChecker.java
+++ b/fe/src/main/java/org/apache/doris/clone/CloneChecker.java
@@ -127,7 +127,7 @@ public boolean checkTabletForSupplement(long dbId, long 
tableId, long partitionI
         }
 
         // 3. init backends distribution info
-        TabletInfo tabletInfo = null;
+        CloneTabletInfo tabletInfo = null;
         db.readLock();
         try {
             OlapTable olapTable = (OlapTable) db.getTable(tableId);
@@ -136,6 +136,11 @@ public boolean checkTabletForSupplement(long dbId, long 
tableId, long partitionI
                 return false;
             }
 
+            if (olapTable.getColocateTable() != null) {
+                LOG.debug("{} is colocate table, ColocateTableBalancer will 
handle. Skip", olapTable.getName());
+                return false;
+            }
+
             Partition partition = olapTable.getPartition(partitionId);
             if (partition == null) {
                 LOG.warn("partition does not exist. id: {}", partitionId);
@@ -182,7 +187,7 @@ public boolean checkTabletForSupplement(long dbId, long 
tableId, long partitionI
 
                 if (tablet.getId() == tabletId) {
                     foundTablet = true;
-                    tabletInfo = new TabletInfo(dbId, tableId, partitionId, 
indexId, tabletId, replicationNum,
+                    tabletInfo = new CloneTabletInfo(dbId, tableId, 
partitionId, indexId, tabletId, replicationNum,
                             onlineReplicaNum, tabletSizeB, backendIds);
                 }
             }
@@ -424,6 +429,12 @@ private void checkTablets() {
                     }
 
                     OlapTable olapTable = (OlapTable) table;
+
+                    if (olapTable.getColocateTable() != null) {
+                        LOG.debug("{} is colocate table, ColocateTableBalancer 
will handle. Skip", olapTable.getName());
+                        continue;
+                    }
+
                     tableId = table.getId();
                     for (Partition partition : olapTable.getPartitions()) {
                         long partitionId = partition.getId();
@@ -454,11 +465,11 @@ private void checkTablets() {
 
                     // init table clone info
                     // backend id -> tablet info set, to gather statistics of 
tablet infos of each backends
-                    Map<Long, Set<TabletInfo>> backendToTablets = 
Maps.newHashMap();
+                    Map<Long, Set<CloneTabletInfo>> backendToTablets = 
Maps.newHashMap();
                     // tablet id -> tablet info, tablets which need to be 
cloned.
-                    Map<Long, TabletInfo> cloneTabletMap = Maps.newHashMap();
+                    Map<Long, CloneTabletInfo> cloneTabletMap = 
Maps.newHashMap();
                     // tablets which have redundant replicas.
-                    Set<TabletInfo> deleteTabletSet = Sets.newHashSet();
+                    Set<CloneTabletInfo> deleteTabletSet = Sets.newHashSet();
                     db.readLock();
                     try {
                         long indexId = index.getId();
@@ -505,16 +516,16 @@ private void checkTablets() {
                                 } 
                             }
 
-                            TabletInfo tabletInfo = new TabletInfo(dbId, 
tableId, partitionId, indexId, tabletId,
+                            CloneTabletInfo tabletInfo = new 
CloneTabletInfo(dbId, tableId, partitionId, indexId, tabletId,
                                                                    
replicationNum, onlineReplicaNum,
                                                                    
tabletSizeB, beIdsOfReplica);
                             tabletInfo.setDbState(db.getDbState());
                             
                             // gather statistics of tablet infos of each 
backends
                             for (long backendId : beIdsOfReplica) {
-                                Set<TabletInfo> tabletInfos = 
backendToTablets.get(backendId);
+                                Set<CloneTabletInfo> tabletInfos = 
backendToTablets.get(backendId);
                                 if (tabletInfos == null) {
-                                    tabletInfos = new HashSet<TabletInfo>();
+                                    tabletInfos = new 
HashSet<CloneTabletInfo>();
                                     backendToTablets.put(backendId, 
tabletInfos);
                                 }
                                 tabletInfos.add(tabletInfo);
@@ -550,7 +561,7 @@ private void checkTablets() {
                     }
 
                     // init backend tablet distribution
-                    for (Map.Entry<Long, Set<TabletInfo>> mapEntry : 
backendToTablets.entrySet()) {
+                    for (Map.Entry<Long, Set<CloneTabletInfo>> mapEntry : 
backendToTablets.entrySet()) {
                         long backendId = mapEntry.getKey();
                         if (backendInfosInCluster.containsKey(backendId)) {
                             final BackendInfo backendInfo = 
backendInfosInCluster.get(backendId);
@@ -578,7 +589,7 @@ private void checkTablets() {
                             
initBackendDistributionInfos(backendInfosInCluster);
                     if (distributionLevelToBackendIds != null && 
!distributionLevelToBackendIds.isEmpty()) {
                         // delete redundant replicas
-                        for (TabletInfo tabletInfo : deleteTabletSet) {
+                        for (CloneTabletInfo tabletInfo : deleteTabletSet) {
                             deleteRedundantReplicas(db, tabletInfo, 
distributionLevelToBackendIds);
                         }
                     } else {
@@ -599,7 +610,7 @@ private void checkTablets() {
         } // end for dbs
     }
 
-    private Map<Long, BackendInfo> initBackendInfos(String clusterName) {
+     Map<Long, BackendInfo> initBackendInfos(String clusterName) {
         Map<Long, BackendInfo> backendInfos = Maps.newHashMap();
         SystemInfoService clusterInfoService = Catalog.getCurrentSystemInfo();
         List<Long> backendIds = null;
@@ -630,7 +641,7 @@ private void checkTablets() {
      * @param backendInfos
      * @return CapLvl to Set of BE List, each BE list represents all BE in one 
host
      */
-    private Map<CapacityLevel, Set<List<Long>>> 
initBackendCapacityInfos(Map<Long, BackendInfo> backendInfos) {
+     Map<CapacityLevel, Set<List<Long>>> initBackendCapacityInfos(Map<Long, 
BackendInfo> backendInfos) {
         Preconditions.checkNotNull(backendInfos);
         if (backendInfos.size() == 0) {
             return null;
@@ -717,7 +728,7 @@ private void checkTablets() {
      * @param backendInfos
      * @return
      */
-    private Map<CapacityLevel, Set<List<Long>>> 
initBackendDistributionInfos(Map<Long, BackendInfo> backendInfos) {
+     Map<CapacityLevel, Set<List<Long>>> 
initBackendDistributionInfos(Map<Long, BackendInfo> backendInfos) {
         Preconditions.checkNotNull(backendInfos);
         if (backendInfos.size() == 0) {
             return null;
@@ -801,10 +812,10 @@ private long selectRandomBackendId(List<Long> 
candidateBackendIds, Set<String> e
      * 2. if supplement, select from 2.1 low distribution and capacity 2.2 low
      * distribution 2.3 all order by distribution
      */
-    private long selectCloneReplicaBackendId(
+    public long selectCloneReplicaBackendId(
             Map<CapacityLevel, Set<List<Long>>> distributionLevelToBackendIds,
             Map<CapacityLevel, Set<List<Long>>> capacityLevelToBackendIds, 
-            Map<Long, BackendInfo> backendInfos, TabletInfo tabletInfo,
+            Map<Long, BackendInfo> backendInfos, CloneTabletInfo tabletInfo,
             JobType jobType, JobPriority priority) {
         Set<Long> existBackendIds = tabletInfo.getBackendIds();
         long tabletSizeB = tabletInfo.getTabletSizeB();
@@ -929,7 +940,7 @@ private long selectCloneReplicaBackendId(
      * (1) offline > clone > low version > high distribution out of cluster 
(2) offline > clone > low version > high
      * distribution in cluster
      */
-    private void deleteRedundantReplicas(Database db, TabletInfo tabletInfo,
+    private void deleteRedundantReplicas(Database db, CloneTabletInfo 
tabletInfo,
             Map<CapacityLevel, Set<List<Long>>> distributionLevelToBackendIds) 
{
         long tableId = tabletInfo.getTableId();
         long partitionId = tabletInfo.getPartitionId();
@@ -945,6 +956,11 @@ private void deleteRedundantReplicas(Database db, 
TabletInfo tabletInfo,
                 return;
             }
 
+            if (olapTable.getColocateTable() != null) {
+                LOG.debug("{} is colocate table, ColocateTableBalancer will 
handle. Skip", olapTable.getName());
+                return ;
+            }
+
             Partition partition = olapTable.getPartition(partitionId);
             if (partition == null) {
                 LOG.warn("partition does not exist. id: {}", partitionId);
@@ -1142,17 +1158,17 @@ public int compare(Replica arg0, Replica arg1) {
         
     }
 
-    private void checkSupplement(Map<Long, TabletInfo> cloneTabletMap,
+    private void checkSupplement(Map<Long, CloneTabletInfo> cloneTabletMap,
             Map<CapacityLevel, Set<List<Long>>> distributionLevelToBackendIds, 
             Map<CapacityLevel, Set<List<Long>>> capacityLevelToBackendIds,
             Map<Long, BackendInfo> backendInfos) {
-        for (TabletInfo tabletInfo : cloneTabletMap.values()) {
+        for (CloneTabletInfo tabletInfo : cloneTabletMap.values()) {
             addCloneJob(tabletInfo, distributionLevelToBackendIds, 
capacityLevelToBackendIds, backendInfos,
                     JobType.SUPPLEMENT);
         }
     }
 
-    private void checkMigration(Map<Long, Set<TabletInfo>> backendToTablets,
+    private void checkMigration(Map<Long, Set<CloneTabletInfo>> 
backendToTablets,
             Map<CapacityLevel, Set<List<Long>>> distributionLevelToBackendIds, 
Map<CapacityLevel, 
             Set<List<Long>>> capacityLevelToBackendIds,
             Map<Long, BackendInfo> backendInfos) {
@@ -1165,7 +1181,7 @@ private void checkMigration(Map<Long, Set<TabletInfo>> 
backendToTablets,
             return;
         }
 
-        Set<TabletInfo> candidateMigrationTablets = Sets.newHashSet();
+        Set<CloneTabletInfo> candidateMigrationTablets = Sets.newHashSet();
         for (List<Long> backendIds : highBackendIds) {
             // select one backend in same host
             Collections.shuffle(backendIds);
@@ -1177,8 +1193,8 @@ private void checkMigration(Map<Long, Set<TabletInfo>> 
backendToTablets,
             LOG.debug("no tablets for migration");
             return;
         }
-        List<TabletInfo> migrationTablets = null;
-        List<TabletInfo> candidateTablets = 
Lists.newArrayList(candidateMigrationTablets);
+        List<CloneTabletInfo> migrationTablets = null;
+        List<CloneTabletInfo> candidateTablets = 
Lists.newArrayList(candidateMigrationTablets);
         if (candidateTablets.size() <= 
CHECK_TABLE_TABLET_NUM_PER_MIGRATION_CYCLE) {
             migrationTablets = candidateTablets;
         } else {
@@ -1190,13 +1206,13 @@ private void checkMigration(Map<Long, Set<TabletInfo>> 
backendToTablets,
         }
 
         // add clone job
-        for (TabletInfo tabletInfo : migrationTablets) {
+        for (CloneTabletInfo tabletInfo : migrationTablets) {
             addCloneJob(tabletInfo, distributionLevelToBackendIds, 
capacityLevelToBackendIds, backendInfos,
                     JobType.MIGRATION);
         }
     }
 
-    private void addCloneJob(TabletInfo tabletInfo, Map<CapacityLevel,
+    private void addCloneJob(CloneTabletInfo tabletInfo, Map<CapacityLevel,
                              Set<List<Long>>> distributionLevelToBackendIds,
                              Map<CapacityLevel, Set<List<Long>>> 
capacityLevelToBackendIds,
                              Map<Long, BackendInfo> backendInfos, JobType 
jobType) {
@@ -1470,161 +1486,5 @@ private boolean checkPassDelayTime(CloneJob job) {
         HIGH, MID, LOW
     }
 
-    private class TabletInfo {
-        private long dbId;
-        private long tableId;
-        private long partitionId;
-        private long indexId;
-        private long tabletId;
-        private short replicationNum;
-        private short onlineReplicaNum;
-        private long tabletSizeB;
-        private Set<Long> backendIds;
-        private DbState dbState;
-
-        public TabletInfo(long dbId, long tableId, long partitionId, long 
indexId, long tabletId, short replicationNum,
-                short onlineReplicaNum, long tabletSizeB, Set<Long> 
backendIds) {
-            this.dbId = dbId;
-            this.tableId = tableId;
-            this.partitionId = partitionId;
-            this.indexId = indexId;
-            this.tabletId = tabletId;
-            this.replicationNum = replicationNum;
-            this.onlineReplicaNum = onlineReplicaNum;
-            this.tabletSizeB = tabletSizeB;
-            this.backendIds = backendIds;
-            this.dbState = DbState.NORMAL;
-        }
-
-        public long getDbId() {
-            return dbId;
-        }
-
-        public long getTableId() {
-            return tableId;
-        }
-
-        public long getPartitionId() {
-            return partitionId;
-        }
-
-        public long getIndexId() {
-            return indexId;
-        }
-
-        public long getTabletId() {
-            return tabletId;
-        }
-
-        public short getReplicationNum() {
-            return replicationNum;
-        }
-
-        public short getOnlineReplicaNum() {
-            return onlineReplicaNum;
-        }
-
-        public long getTabletSizeB() {
-            return tabletSizeB;
-        }
-
-        public Set<Long> getBackendIds() {
-            return backendIds;
-        }
-
-        @Override
-        public String toString() {
-            return "TabletInfo [dbId=" + dbId + ", tableId=" + tableId + ", 
partitionId=" + partitionId + ", indexId="
-                    + indexId + ", tabletId=" + tabletId + ", replicationNum=" 
+ replicationNum + ", onlineReplicaNum="
-                    + onlineReplicaNum + ", tabletSizeB=" + tabletSizeB + ", 
backendIds=" + backendIds + "]";
-        }
-
-        public DbState getDbState() {
-            return dbState;
-        }
-
-        public void setDbState(DbState dbState) {
-            this.dbState = dbState;
-        }
-    }
-
-    private class BackendInfo {
-        private long backendId;
-        private String host;
-        
-        private long totalCapacityB;
-        private long availableCapacityB;
-        // capacity for clone
-        private long cloneCapacityB;
-
-        private int tableReplicaNum;
-        // replica num for clone
-        private int cloneReplicaNum;
-
-        public BackendInfo(String host, long backendId, long totalCapacityB, 
long availableCapacityB) {
-            this.backendId = backendId;
-            this.totalCapacityB = totalCapacityB;
-            this.availableCapacityB = availableCapacityB;
-            this.host = host;
-            this.cloneCapacityB = 0L;
-            this.tableReplicaNum = 0;
-            this.cloneReplicaNum = 0;
-        }
-
-        public String getHost() {
-            return host;
-        }
-
-        public long getBackendId() {
-            return backendId;
-        }
-
-        public long getTotalCapacityB() {
-            return totalCapacityB;
-        }
-
-        public long getAvailableCapacityB() {
-            return availableCapacityB;
-        }
-
-        public void setCloneCapacityB(long cloneCapacityB) {
-            this.cloneCapacityB = cloneCapacityB;
-        }
-
-        public boolean canCloneByCapacity(long tabletSizeB) {
-            if (cloneCapacityB <= tabletSizeB) {
-                return false;
-            }
-            return true;
-        }
-
-        public void decreaseCloneCapacityB(long tabletSizeB) {
-            cloneCapacityB -= tabletSizeB;
-        }
-
-        public int getTableReplicaNum() {
-            return tableReplicaNum;
-        }
-
-        public void setTableReplicaNum(int tableReplicaNum) {
-            this.tableReplicaNum = tableReplicaNum;
-        }
-
-        public void setCloneReplicaNum(int cloneReplicaNum) {
-            this.cloneReplicaNum = cloneReplicaNum;
-        }
-
-        public boolean canCloneByDistribution() {
-            if (cloneReplicaNum <= 1) {
-                return false;
-            }
-            return true;
-        }
-
-        public void decreaseCloneReplicaNum() {
-            cloneReplicaNum -= 1;
-        }
-    }
-
 }
 
diff --git a/fe/src/main/java/org/apache/doris/clone/CloneTabletInfo.java 
b/fe/src/main/java/org/apache/doris/clone/CloneTabletInfo.java
new file mode 100644
index 00000000..f16affe8
--- /dev/null
+++ b/fe/src/main/java/org/apache/doris/clone/CloneTabletInfo.java
@@ -0,0 +1,100 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.clone;
+
+import org.apache.doris.catalog.Database;
+
+import java.util.Set;
+
+public class CloneTabletInfo {
+    private long dbId;
+    private long tableId;
+    private long partitionId;
+    private long indexId;
+    private long tabletId;
+    private short replicationNum;
+    private short onlineReplicaNum;
+    private long tabletSizeB;
+    private Set<Long> backendIds;
+    private Database.DbState dbState;
+
+    public CloneTabletInfo(long dbId, long tableId, long partitionId, long 
indexId, long tabletId, short replicationNum,
+                           short onlineReplicaNum, long tabletSizeB, Set<Long> 
backendIds) {
+        this.dbId = dbId;
+        this.tableId = tableId;
+        this.partitionId = partitionId;
+        this.indexId = indexId;
+        this.tabletId = tabletId;
+        this.replicationNum = replicationNum;
+        this.onlineReplicaNum = onlineReplicaNum;
+        this.tabletSizeB = tabletSizeB;
+        this.backendIds = backendIds;
+        this.dbState = Database.DbState.NORMAL;
+    }
+
+    public long getDbId() {
+        return dbId;
+    }
+
+    public long getTableId() {
+        return tableId;
+    }
+
+    public long getPartitionId() {
+        return partitionId;
+    }
+
+    public long getIndexId() {
+        return indexId;
+    }
+
+    public long getTabletId() {
+        return tabletId;
+    }
+
+    public short getReplicationNum() {
+        return replicationNum;
+    }
+
+    public short getOnlineReplicaNum() {
+        return onlineReplicaNum;
+    }
+
+    public long getTabletSizeB() {
+        return tabletSizeB;
+    }
+
+    public Set<Long> getBackendIds() {
+        return backendIds;
+    }
+
+    @Override
+    public String toString() {
+        return "TabletInfo [dbId=" + dbId + ", tableId=" + tableId + ", 
partitionId=" + partitionId + ", indexId="
+                + indexId + ", tabletId=" + tabletId + ", replicationNum=" + 
replicationNum + ", onlineReplicaNum="
+                + onlineReplicaNum + ", tabletSizeB=" + tabletSizeB + ", 
backendIds=" + backendIds + "]";
+    }
+
+    public Database.DbState getDbState() {
+        return dbState;
+    }
+
+    public void setDbState(Database.DbState dbState) {
+        this.dbState = dbState;
+    }
+}
diff --git a/fe/src/main/java/org/apache/doris/clone/ColocateTableBalancer.java 
b/fe/src/main/java/org/apache/doris/clone/ColocateTableBalancer.java
new file mode 100644
index 00000000..9d71f996
--- /dev/null
+++ b/fe/src/main/java/org/apache/doris/clone/ColocateTableBalancer.java
@@ -0,0 +1,616 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.clone;
+
+
+import com.google.common.collect.HashBasedTable;
+import com.google.common.collect.Sets;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.ColocateTableIndex;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.MaterializedIndex;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.Replica;
+import org.apache.doris.catalog.Tablet;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.util.Daemon;
+import org.apache.doris.persist.ColocatePersistInfo;
+import org.apache.doris.persist.ReplicaPersistInfo;
+import org.apache.doris.system.Backend;
+import org.apache.doris.system.SystemInfoService;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * when backend remove, down, and add, balance colocate tablets
+ * some work delegate to {@link CloneChecker}
+ * CloneChecker don't handle colocate tablets
+ */
+public class ColocateTableBalancer extends Daemon {
+    private static final Logger LOG = 
LogManager.getLogger(ColocateTableBalancer.class);
+
+    private ColocateTableBalancer(long intervalMs) {
+        super("colocate group clone checker", intervalMs);
+    }
+
+    private static ColocateTableBalancer INSTANCE = null;
+
+    public static ColocateTableBalancer getInstance() {
+        if (INSTANCE == null) {
+            INSTANCE = new 
ColocateTableBalancer(Config.clone_checker_interval_second * 1000L);
+        }
+        return INSTANCE;
+    }
+
+    @Override
+    /**
+     * firstly, try marking balancing group to stable if the balance has 
finished
+     * secondly, try deleting redundant replicas for colocate group balance 
finished just now
+     *          we delay to delete redundant replicas until all clone job is 
done.
+     * thirdly, try balancing colocate group if we found backend removed, down 
or added.
+     */
+    protected void runOneCycle() {
+        tryMarkBalancingGroupStable();
+
+        tryDeleteRedundantReplicas();
+
+        tryBalanceWhenBackendChange();
+    }
+
+    /**
+     * check all balancing colocate group tables
+     * if all tables in a colocate group are stable, mark the colocate group 
stable
+     */
+    private synchronized void tryMarkBalancingGroupStable() {
+        ColocateTableIndex colocateIndex = Catalog.getCurrentColocateIndex();
+        Catalog catalog = Catalog.getInstance();
+
+        Set<Long> allGroups = colocateIndex.getAllGroupIds();
+        for (Long group : allGroups) {
+            LOG.info("colocate group: {} backendsPerBucketSeq is {}", group, 
colocateIndex.getBackendsPerBucketSeq(group));
+        }
+
+        Set<Long> balancingGroupIds = colocateIndex.getBalancingGroupIds();
+        if (balancingGroupIds.size() == 0) {
+            LOG.info("All colocate groups are stable. Skip");
+            return;
+        }
+
+        for (Long groupId : balancingGroupIds) {
+            Database db = catalog.getDb(colocateIndex.getDB(groupId));
+
+            boolean isBalancing = false;
+            List<Long> allTableIds = colocateIndex.getAllTableIds(groupId);
+            for (long tableId : allTableIds) {
+                OlapTable olapTable = (OlapTable) db.getTable(tableId);
+                if (checkTableBalancing(db, olapTable, 
colocateIndex.getBackendsPerBucketSeq(groupId))) {
+                    isBalancing = true;
+                    break;
+                }
+            }
+
+            if (!isBalancing) {
+                colocateIndex.markGroupStable(groupId);
+                ColocatePersistInfo info = 
ColocatePersistInfo.CreateForMarkStable(groupId);
+                Catalog.getInstance().getEditLog().logColocateMarkStable(info);
+                LOG.info("colocate group : {} become stable!", 
db.getTable(groupId).getName());
+            }
+        }
+    }
+
+    /**
+     * 1 check the colocate table whether balancing
+     *      A colocate table is stable means:
+     *      a: all replica state is not clone
+     *      b: the tablet backendIds are consistent with ColocateTableIndex's 
backendsPerBucketSeq
+     *
+     * 2 if colocate table is balancing , we will try adding a clone job
+     *      handle the FE restart when colocate groups balancing case:
+     *      After FE restart, the clone job meta will lose
+     */
+    private boolean checkTableBalancing(Database db, OlapTable olapTable, 
List<List<Long>> backendsPerBucketSeq) {
+        boolean isBalancing = false;
+        out: for (Partition partition : olapTable.getPartitions()) {
+            short replicateNum = 
olapTable.getPartitionInfo().getReplicationNum(partition.getId());
+            for (MaterializedIndex index : partition.getMaterializedIndices()) 
{
+                List<Tablet> tablets = index.getTablets();
+                for (int i = 0; i < tablets.size(); i++) {
+                    Tablet tablet = tablets.get(i);
+                    //1 check all replica state is not clone
+                    for (Replica replica : tablet.getReplicas()) {
+                        if 
(replica.getState().equals(Replica.ReplicaState.CLONE)) {
+                            isBalancing = true;
+                            LOG.info("colocate group : {} is still balancing, 
there is clone Replica", olapTable.getColocateTable());
+                            break out;
+                        }
+                    }
+
+                    List<Long> groupBackends = new 
ArrayList<>(backendsPerBucketSeq.get(i));
+                    Set<Long> tabletBackends = tablet.getBackendIds();
+                    //2 check the tablet backendIds are consistent with 
ColocateTableIndex's backendsPerBucketSeq
+                    if (!tabletBackends.containsAll(groupBackends)) {
+                        isBalancing = true;
+                        LOG.info("colocate group : {} is still balancing, may 
be clone job hasn't run, try adding a clone job", olapTable.getColocateTable());
+
+                        //try adding a clone job
+                        //clone.addCloneJob has duplicated check, so there 
isn't side-effect
+                        List<Long> clusterAliveBackendIds = 
getAliveClusterBackendIds(db.getClusterName());
+                        groupBackends.removeAll(tabletBackends);
+
+                        //for backend added;
+                        if 
(clusterAliveBackendIds.containsAll(tabletBackends)) {
+                            //we can ignore tabletSizeB parameter here
+                            CloneTabletInfo tabletInfo = new 
CloneTabletInfo(db.getId(), olapTable.getId(), partition.getId(),
+                                    index.getId(), tablet.getId(), 
replicateNum, replicateNum, 0, tabletBackends);
+
+                            for (Long cloneBackend : groupBackends) {
+                                AddMigrationJob(tabletInfo, cloneBackend);
+                            }
+                        } else { //for backend down or removed
+                            short onlineReplicaNum = (short) (replicateNum - 
groupBackends.size());
+                            CloneTabletInfo tabletInfo = new 
CloneTabletInfo(db.getId(), olapTable.getId(), partition.getId(),
+                                    index.getId(), tablet.getId(), 
replicateNum, onlineReplicaNum, 0, tabletBackends);
+
+                            for (Long cloneBackend : groupBackends) {
+                                AddSupplementJob(tabletInfo, cloneBackend);
+                            }
+                        }
+                    }
+                } //end tablet
+            } //end index
+        } //end partition
+        return isBalancing;
+    }
+
+    /**
+     * firstly, check backend removed or down
+     * secondly, check backend added
+     */
+    private synchronized void tryBalanceWhenBackendChange() {
+        ColocateTableIndex colocateIndex = Catalog.getCurrentColocateIndex();
+        Catalog catalog = Catalog.getInstance();
+
+        Set<Long> allGroupIds = colocateIndex.getAllGroupIds();
+        for (Long groupId : allGroupIds) {
+            Database db = catalog.getDb(colocateIndex.getDB(groupId));
+            List<Long> clusterAliveBackendIds = 
getAliveClusterBackendIds(db.getClusterName());
+            Set<Long> allGroupBackendIds = 
colocateIndex.getBackendsByGroup(groupId);
+            List<List<Long>> backendsPerBucketSeq = 
colocateIndex.getBackendsPerBucketSeq(groupId);
+
+            //1 check backend removed or down
+            if (!clusterAliveBackendIds.containsAll(allGroupBackendIds)) {
+                Set<Long> removedBackendIds = 
Sets.newHashSet(allGroupBackendIds);
+                removedBackendIds.removeAll(clusterAliveBackendIds);
+
+                //A backend in Colocate group but not alive, which means the 
backend is removed or down
+                Iterator removedBackendIdsIterator = 
removedBackendIds.iterator();
+                while (removedBackendIdsIterator.hasNext()) {
+                    Long removedBackendId = (Long) 
removedBackendIdsIterator.next();
+                    Backend removedBackend = 
Catalog.getCurrentSystemInfo().getBackend(removedBackendId);
+                    if (removedBackend != null && 
!removedBackend.isDecommissioned() && System.currentTimeMillis() - 
removedBackend.getLastUpdateMs() < Config.max_backend_down_time_second * 1000) {
+                        LOG.info("backend[{}-{}] is down for a short time. 
ignore", removedBackend, removedBackend.getHost());
+                        removedBackendIdsIterator.remove();
+                    }
+                }
+
+                if (!removedBackendIds.isEmpty()) {
+                    LOG.info("removedBackendIds {} for colocate group {}", 
removedBackendIds, groupId);
+                    //multiple backend removed is unusual, so we handle one by 
one
+                    for (Long backendId : removedBackendIds) {
+                        balanceForBackendRemoved(db, groupId, backendId);
+                    }
+                    continue; //for one colocate group, only handle backend 
removed or added event once
+                }
+            }
+
+            //2 check backend added
+            int replicateNum = backendsPerBucketSeq.get(0).size();
+            if (backendsPerBucketSeq.size() * replicateNum <= 
allGroupBackendIds.size()) {
+                //if each tablet replica has a different backend, which means 
the colocate group
+                //has fully balanced. we can ignore the new backend added.
+                LOG.info("colocate group {} has already fully balanced. skip", 
groupId);
+                continue;
+            }
+
+            if (clusterAliveBackendIds.size() > allGroupBackendIds.size()) {
+                clusterAliveBackendIds.removeAll(allGroupBackendIds);
+
+                if (!clusterAliveBackendIds.isEmpty()) {
+                    LOG.info("new backends for colocate group {} are {}", 
groupId, clusterAliveBackendIds);
+                    balanceForBackendAdded(groupId, db, 
clusterAliveBackendIds);
+                }
+            }
+        }
+    }
+
+    //get the backends: 1 belong to this cluster; 2 alive; 3 not decommissioned
+    private List<Long> getAliveClusterBackendIds(String clusterName) {
+        SystemInfoService systemInfo = Catalog.getCurrentSystemInfo();
+        List<Long> clusterBackendIds = 
systemInfo.getClusterBackendIds(clusterName, true);
+        List<Long> decommissionedBackendIds = 
systemInfo.getDecommissionedBackendIds();
+        clusterBackendIds.removeAll(decommissionedBackendIds);
+        return clusterBackendIds;
+    }
+
+    private synchronized void tryDeleteRedundantReplicas() {
+        ColocateTableIndex colocateIndex = Catalog.getCurrentColocateIndex();
+        Catalog catalog = Catalog.getInstance();
+
+        Set<Long> allGroupIds = colocateIndex.getAllGroupIds();
+        for (Long groupId : allGroupIds) {
+            Set<Long> balancingGroups = colocateIndex.getBalancingGroupIds();
+            //only delete reduntdant replica when group is stable
+            if (!balancingGroups.contains(groupId)) {
+                Database db = catalog.getDb(colocateIndex.getDB(groupId));
+                List<Long> allTableIds = colocateIndex.getAllTableIds(groupId);
+                Set<CloneTabletInfo> deleteTabletSet = 
Sets.newHashSet();//keep tablets which have redundant replicas.
+                int replicateNum = -1;
+                for (long tableId : allTableIds) {
+                    db.readLock();
+                    try {
+                        OlapTable olapTable = (OlapTable) db.getTable(tableId);
+                        for (Partition partition : olapTable.getPartitions()) {
+                            replicateNum = 
olapTable.getPartitionInfo().getReplicationNum(partition.getId());
+                            for (MaterializedIndex index : 
partition.getMaterializedIndices()) {
+                                // only check NORMAL index
+                                if (index.getState() != 
MaterializedIndex.IndexState.NORMAL) {
+                                    continue;
+                                }
+                                for (Tablet tablet : index.getTablets()) {
+                                    List<Replica> replicas = 
tablet.getReplicas();
+                                    if (replicas.size() > replicateNum) {
+                                        CloneTabletInfo tableInfo = new 
CloneTabletInfo(db.getId(), tableId, partition.getId(), index.getId(), 
tablet.getId(), (short) replicateNum, (short) replicas.size(), 0, new 
HashSet<>());
+                                        deleteTabletSet.add(tableInfo);
+                                    }
+                                }
+                            }
+                        }
+                    } finally {
+                        db.readUnlock();
+                    }
+                }
+
+                if (deleteTabletSet.size() > 0) {
+                    LOG.info("colocate group {} will delete tablet {}", 
groupId, deleteTabletSet);
+                    //delete tablet will affect colocate table local query 
schedule,
+                    //so make colocate group balancing again
+                    colocateIndex.markGroupBalancing(groupId);
+                    ColocatePersistInfo info = 
ColocatePersistInfo.CreateForMarkBalancing(groupId);
+                    
Catalog.getInstance().getEditLog().logColocateMarkBalancing(info);
+                    for (CloneTabletInfo tabletInfo : deleteTabletSet) {
+                        deleteRedundantReplicas(db, tabletInfo);
+                    }
+                }
+
+            }
+        }
+    }
+
+    private void deleteRedundantReplicas(Database db, CloneTabletInfo 
tabletInfo) {
+        long tableId = tabletInfo.getTableId();
+        long partitionId = tabletInfo.getPartitionId();
+        long indexId = tabletInfo.getIndexId();
+        long tabletId = tabletInfo.getTabletId();
+
+        db.writeLock();
+        try {
+            OlapTable olapTable = (OlapTable) db.getTable(tableId);
+            Partition partition = olapTable.getPartition(partitionId);
+            MaterializedIndex index = partition.getIndex(indexId);
+            Tablet tablet = index.getTablet(tabletId);
+            List<Replica> replicas = tablet.getReplicas();
+
+            //delete replica for backend removed
+            List<Replica> copyReplicas = new ArrayList<>(replicas);
+            for (Replica replica : copyReplicas) {
+                long backendId = replica.getBackendId();
+                if 
(!Catalog.getCurrentSystemInfo().checkBackendAvailable(backendId)) {
+                    deleteReplica(tablet, replica, db.getId(), tableId, 
partitionId, indexId);
+                }
+            }
+
+            //delete replica for backend added
+            List<Replica> updatedReplicas = tablet.getReplicas();
+            short replicationNum = 
olapTable.getPartitionInfo().getReplicationNum(partition.getId());
+            if (updatedReplicas.size() <= replicationNum) {
+                return;
+            }
+
+            int deleteNum = updatedReplicas.size() - replicationNum;
+            List<Long> sortedReplicaIds = sortReplicaId(updatedReplicas);
+
+            //always delete replica which id is minimum
+            for (int i = 0; i < deleteNum; i++) {
+                Replica deleteReplica = 
tablet.getReplicaById(sortedReplicaIds.get(i));
+                deleteReplica(tablet, deleteReplica, db.getId(), tableId, 
partitionId, indexId);
+            }
+
+        } finally {
+            db.writeUnlock();
+        }
+    }
+
+    private List<Long> sortReplicaId(List<Replica> replicas) {
+        List<Long> replicaIds = new ArrayList<>();
+        for (Replica replica : replicas) {
+            replicaIds.add(replica.getId());
+        }
+        Collections.sort(replicaIds);
+        return replicaIds;
+    }
+
+
+    private void deleteReplica(Tablet tablet, Replica deleteReplica, long 
dbId, long tableId, long partitionId, long indexId) {
+        if (tablet.deleteReplica(deleteReplica)) {
+            ReplicaPersistInfo info = ReplicaPersistInfo.createForDelete(dbId, 
tableId, partitionId, indexId, tablet.getId(), deleteReplica.getBackendId());
+            Catalog.getInstance().getEditLog().logDeleteReplica(info);
+
+            Catalog.getInstance().handleJobsWhenDeleteReplica(tableId, 
partitionId, indexId, tablet.getId(), deleteReplica.getId(), 
deleteReplica.getBackendId());
+            LOG.info("delete replica {} for tablet: {}", deleteReplica, 
tablet.getId());
+        }
+    }
+
+    /**
+     * 1 compute need delete BucketSeqs in the removedBackend
+     * 2 select clone replica BackendId for the new Replica
+     * 3 mark colocate group balancing
+     * 4 add a Supplement Job
+     * 5 update the TableColocateIndex backendsPerBucketSeq metadata
+     */
+    private void balanceForBackendRemoved(Database db, Long groupId, Long 
removedBackendId) {
+        ColocateTableIndex colocateIndex = Catalog.getCurrentColocateIndex();
+        com.google.common.collect.Table<Long, Integer, Long> 
newGroup2BackendsPerBucketSeq = HashBasedTable.create();
+
+        List<List<Long>> backendsPerBucketSeq = 
colocateIndex.getBackendsPerBucketSeq(groupId);
+        List<Integer> needDeleteBucketSeqs = new ArrayList<>();
+
+        int size = backendsPerBucketSeq.size();
+        for (int i = 0; i < size; i++) {
+            List<Long> backends = backendsPerBucketSeq.get(i);
+            if (backends.contains(removedBackendId)) {
+                needDeleteBucketSeqs.add(i);
+            }
+        }
+
+        List<List<Long>> newBackendsPerBucketSeq = 
deepCopy(backendsPerBucketSeq);
+
+        db.readLock();
+        try {
+            List<Long> allTableIds = colocateIndex.getAllTableIds(groupId);
+            for (long tableId : allTableIds) {
+                OlapTable olapTable = (OlapTable) db.getTable(tableId);
+                for (Partition partition : olapTable.getPartitions()) {
+                    int replicateNum = 
olapTable.getPartitionInfo().getReplicationNum(partition.getId());
+                    for (MaterializedIndex index : 
partition.getMaterializedIndices()) {
+                        List<Tablet> tablets = index.getTablets();
+                        for (int i : needDeleteBucketSeqs) {
+                            Tablet tablet = tablets.get(i);
+                            Replica deleteReplica = null;
+                            for (Replica replica : tablet.getReplicas()) {
+                                if (replica.getBackendId() == 
removedBackendId) {
+                                    deleteReplica = replica;
+                                }
+                            }
+
+                            long tabletSizeB = deleteReplica.getDataSize() * 
partition.getMaterializedIndices().size() * olapTable.getPartitions().size() * 
allTableIds.size();
+                            CloneTabletInfo tabletInfo = new 
CloneTabletInfo(db.getId(), tableId, partition.getId(),
+                            index.getId(), tablet.getId(), (short) 
replicateNum, (short) (replicateNum - 1),
+                            tabletSizeB, tablet.getBackendIds());
+
+                            long cloneReplicaBackendId = 
selectCloneBackendIdForRemove(newGroup2BackendsPerBucketSeq, groupId, i, 
db.getClusterName(), tabletInfo);
+
+                            if (cloneReplicaBackendId != -1L) {
+                                if (!colocateIndex.isGroupBalancing(groupId)) {
+                                    colocateIndex.markGroupBalancing(groupId);
+                                    ColocatePersistInfo info = 
ColocatePersistInfo.CreateForMarkBalancing(groupId);
+                                    
Catalog.getInstance().getEditLog().logColocateMarkBalancing(info);
+                                }
+
+                                //update TableColocateIndex groupBucket2BEs
+                                List<Long> backends = 
newBackendsPerBucketSeq.get(i);
+                                backends.remove(removedBackendId);
+                                if (!backends.contains(cloneReplicaBackendId)) 
{
+                                    backends.add(cloneReplicaBackendId);
+                                }
+
+                                AddSupplementJob(tabletInfo, 
cloneReplicaBackendId);
+                            }
+                        }
+                    }
+                }
+            }
+            colocateIndex.addBackendsPerBucketSeq(groupId, 
newBackendsPerBucketSeq);
+            ColocatePersistInfo info = 
ColocatePersistInfo.CreateForBackendsPerBucketSeq(groupId, 
newBackendsPerBucketSeq);
+            
Catalog.getInstance().getEditLog().logColocateBackendsPerBucketSeq(info);
+        } catch (Exception e) {
+            LOG.error(e.getMessage(), e);
+        } finally {
+            db.readUnlock();
+        }
+    }
+
+    private List<List<Long>> deepCopy(List<List<Long>> backendsPerBucketSeq) {
+        List<List<Long>> newBackendsPerBucketSeq = new ArrayList<>();
+        for (List<Long> backends : backendsPerBucketSeq) {
+            newBackendsPerBucketSeq.add(new ArrayList<>(backends));
+        }
+        return newBackendsPerBucketSeq;
+    }
+
+    //this logic is like CloneChecker.checkTabletForSupplement and 
CloneChecker.addCloneJob
+    private Long 
selectCloneBackendIdForRemove(com.google.common.collect.Table<Long, Integer, 
Long> newGroup2BackendsPerBucketSeq, long group, int bucketSeq, String 
clusterName, CloneTabletInfo tabletInfo) {
+        Long cloneReplicaBackendId = null;
+        cloneReplicaBackendId = newGroup2BackendsPerBucketSeq.get(group, 
bucketSeq);
+        if (cloneReplicaBackendId == null) {
+            // beId -> BackendInfo
+            final Map<Long, BackendInfo> backendInfosInCluster = 
CloneChecker.getInstance().initBackendInfos(clusterName);
+            if (backendInfosInCluster.isEmpty()) {
+                LOG.warn("failed to init backend infos of cluster: {}", 
clusterName);
+                return -1L;
+            }
+
+            // tablet distribution level
+            final Map<CloneChecker.CapacityLevel, Set<List<Long>>> 
clusterDistributionLevelToBackendIds = 
CloneChecker.getInstance().initBackendDistributionInfos(backendInfosInCluster);
+
+            final Map<CloneChecker.CapacityLevel, Set<List<Long>>> 
clusterCapacityLevelToBackendIds = 
CloneChecker.getInstance().initBackendCapacityInfos(backendInfosInCluster);
+            if (clusterCapacityLevelToBackendIds == null || 
clusterCapacityLevelToBackendIds.isEmpty()) {
+                LOG.warn("failed to init capacity level map of cluster: {}", 
clusterName);
+                return -1L;
+            }
+
+            // select dest backend
+            cloneReplicaBackendId = 
CloneChecker.getInstance().selectCloneReplicaBackendId(
+                    clusterDistributionLevelToBackendIds, 
clusterCapacityLevelToBackendIds,
+                    backendInfosInCluster, tabletInfo, 
CloneJob.JobType.SUPPLEMENT, CloneJob.JobPriority.HIGH);
+
+            if (cloneReplicaBackendId == -1) {
+                LOG.warn("fail to select clone replica backend. tablet: {}", 
tabletInfo);
+                return -1L;
+            }
+
+            newGroup2BackendsPerBucketSeq.put(group, bucketSeq, 
cloneReplicaBackendId);
+        }
+
+        LOG.info("select clone replica dest backend id: {} for groop: {} 
TabletInfo: {}", cloneReplicaBackendId, group, tabletInfo);
+        return cloneReplicaBackendId;
+    }
+
+    /**
+     * balance after new backend added
+     *
+     * 1 compute the the number of bucket seqs need to move from the each old 
backend and
+     *   the number of bucket seqs need to move to the each new backend
+     * 2 select the clone target Backend for the new Replica
+     * 3 mark colocate group balancing
+     * 4 add a Migration Job
+     * 5 update the ColocateTableIndex's backendsPerBucketSeq
+     *
+     * For example:
+     * There are 3 backend and 4 tablet, and replicateNum is 3.
+     *
+     * the mapping from tablet to backend to is following:
+     *
+     * tablet1 : [1, 2, 3]
+     * tablet2 : [2, 1, 3]
+     * tablet3 : [3, 2, 1]
+     * tablet4 : [1, 2, 3]
+     *
+     * After Adding a new backend:
+     *
+     * the needMoveBucketSeqs  = 4 * 3 / (3 + 1) = 3
+     * the bucketSeqsPerNewBackend = 3 / 1 = 1
+     *
+     * After balancing, the mapping from tablet to backend to is following:
+     *
+     * tablet1 : [4, 2, 3]
+     * tablet2 : [4, 1, 3]
+     * tablet3 : [4, 2, 1]
+     * tablet4 : [1, 2, 3]
+     *
+     */
+    private void balanceForBackendAdded(Long groupId, Database db, List<Long> 
addedBackendIds) {
+        ColocateTableIndex colocateIndex = Catalog.getCurrentColocateIndex();
+        com.google.common.collect.Table<Long, Integer, Long> 
newGroup2BackendsPerBucketSeq = HashBasedTable.create();
+
+        List<List<Long>> backendsPerBucketSeq = 
colocateIndex.getBackendsPerBucketSeq(groupId);
+        int replicateNum = backendsPerBucketSeq.get(0).size();
+        Set<Long> allGroupBackendIds = 
colocateIndex.getBackendsByGroup(groupId);
+
+        List<List<Long>> newBackendsPerBucketSeq = 
deepCopy(backendsPerBucketSeq);
+
+        int needMoveBucketSeqs = backendsPerBucketSeq.size() * replicateNum / 
(allGroupBackendIds.size() + addedBackendIds.size());
+        int bucketSeqsPerNewBackend = needMoveBucketSeqs / 
addedBackendIds.size();
+        LOG.info("for colocate group {}, needMoveBucketSeqs : {} , 
bucketSeqPerNewBackend: {}", groupId, needMoveBucketSeqs, 
bucketSeqsPerNewBackend);
+
+        db.readLock();
+        try {
+            List<Long> allTableIds = colocateIndex.getAllTableIds(groupId);
+            for (long tableId : allTableIds) {
+                OlapTable olapTable = (OlapTable) db.getTable(tableId);
+                for (Partition partition : olapTable.getPartitions()) {
+                    replicateNum = 
olapTable.getPartitionInfo().getReplicationNum(partition.getId());
+                    for (MaterializedIndex index : 
partition.getMaterializedIndices()) {
+                        List<Tablet> tablets = index.getTablets();
+                        for (int i = 0; i < tablets.size() && i < 
needMoveBucketSeqs; i++) {
+                            Tablet tablet = tablets.get(i);
+                            List<Replica> replicas = tablet.getReplicas();
+                            List<Long> sortedReplicaIds = 
sortReplicaId(replicas);
+                            //always delete replica which id is minimum
+                            Replica deleteReplica = 
tablet.getReplicaById(sortedReplicaIds.get(0));
+
+                            long tabletSizeB = deleteReplica.getDataSize() * 
partition.getMaterializedIndices().size()
+                             * olapTable.getPartitions().size() * 
allTableIds.size();
+                            CloneTabletInfo tabletInfo = new 
CloneTabletInfo(db.getId(), tableId, partition.getId(),
+                            index.getId(), tablet.getId(), (short) 
replicateNum, (short) replicateNum,
+                            tabletSizeB, tablet.getBackendIds());
+
+                            Long cloneReplicaBackendId = 
newGroup2BackendsPerBucketSeq.get(groupId, i);
+                            if (cloneReplicaBackendId == null) {
+                                cloneReplicaBackendId = addedBackendIds.get(i 
/ bucketSeqsPerNewBackend);
+                                newGroup2BackendsPerBucketSeq.put(groupId, i, 
cloneReplicaBackendId);
+                            }
+
+                            if (!colocateIndex.isGroupBalancing(groupId)) {
+                                colocateIndex.markGroupBalancing(groupId);
+                                ColocatePersistInfo info = 
ColocatePersistInfo.CreateForMarkBalancing(groupId);
+                                
Catalog.getInstance().getEditLog().logColocateMarkBalancing(info);
+                            }
+
+                            //update ColocateTableIndex backendsPerBucketSeq
+                            List<Long> backends = 
newBackendsPerBucketSeq.get(i);
+                            backends.remove(deleteReplica.getBackendId());
+                            if (!backends.contains(cloneReplicaBackendId)) {
+                                backends.add(cloneReplicaBackendId);
+                            }
+
+                            AddMigrationJob(tabletInfo, cloneReplicaBackendId);
+                        }
+                    }
+                }
+            }
+            colocateIndex.addBackendsPerBucketSeq(groupId, 
newBackendsPerBucketSeq);
+            ColocatePersistInfo info = 
ColocatePersistInfo.CreateForBackendsPerBucketSeq(groupId, 
newBackendsPerBucketSeq);
+            
Catalog.getInstance().getEditLog().logColocateBackendsPerBucketSeq(info);
+        } catch (Exception e) {
+            LOG.error(e.getMessage(), e);
+        } finally {
+            db.readUnlock();
+        }
+    }
+
+    //for backend down or removed
+    private void AddSupplementJob(CloneTabletInfo tabletInfo, long 
cloneBackendId) {
+        Clone clone = Catalog.getInstance().getCloneInstance();
+        clone.addCloneJob(tabletInfo.getDbId(), tabletInfo.getTableId(), 
tabletInfo.getPartitionId(), tabletInfo.getIndexId(), tabletInfo.getTabletId(), 
cloneBackendId, CloneJob.JobType.SUPPLEMENT, CloneJob.JobPriority.HIGH, 
Config.clone_job_timeout_second * 1000L);
+    }
+
+    //for backend added
+    private void AddMigrationJob(CloneTabletInfo tabletInfo, long 
cloneBackendId) {
+        Clone clone = Catalog.getInstance().getCloneInstance();
+        clone.addCloneJob(tabletInfo.getDbId(), tabletInfo.getTableId(), 
tabletInfo.getPartitionId(), tabletInfo.getIndexId(), tabletInfo.getTabletId(), 
cloneBackendId, CloneJob.JobType.MIGRATION, CloneJob.JobPriority.HIGH, 
Config.clone_job_timeout_second * 1000L);
+    }
+}
diff --git a/fe/src/main/java/org/apache/doris/common/Config.java 
b/fe/src/main/java/org/apache/doris/common/Config.java
index fa13718a..1e36e74d 100644
--- a/fe/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/src/main/java/org/apache/doris/common/Config.java
@@ -481,6 +481,16 @@
      * If the response time of a query exceed this threshold, it will be 
recored in audit log as slow_query.
      */
     @ConfField public static long qe_slow_log_ms = 5000;
+    /*
+    * The memory_limit for coloctae join PlanFragment instance =
+    * exec_mem_limit / min (query_colocate_join_memory_limit_penalty_factor, 
instance_num)
+    */
+    @ConfField
+    public static int query_colocate_join_memory_limit_penalty_factor = 8;
+
+    @ConfField
+    public static boolean disable_colocate_join = true;
+
     /*
      * The interval of user resource publishing.
      * User resource contains cgroup configurations of a user.
diff --git a/fe/src/main/java/org/apache/doris/common/ErrorCode.java 
b/fe/src/main/java/org/apache/doris/common/ErrorCode.java
index d283d2af..d7b09f5a 100644
--- a/fe/src/main/java/org/apache/doris/common/ErrorCode.java
+++ b/fe/src/main/java/org/apache/doris/common/ErrorCode.java
@@ -196,7 +196,21 @@
     ERR_WRONG_NAME_FORMAT(5063, new byte[] { '4', '2', '0', '0', '0' },
             "Incorrect %s name '%s'"),
     ERR_COMMON_ERROR(5064, new byte[] { '4', '2', '0', '0', '0' },
-            "%s");
+            "%s"),
+    ERR_COLOCATE_TABLE_DISABLED(5063, new byte[] { '4', '2', '0', '0', '0' },
+            "Colocate table is disabled by Admin"),
+    ERR_COLOCATE_TABLE_NO_EXIT(5063, new byte[] { '4', '2', '0', '0', '0' },
+            "Colocate table '%s' no exist"),
+    ERR_COLOCATE_TABLE_MUST_OLAP_TABLE(5063, new byte[] { '4', '2', '0', '0', 
'0' },
+            "Colocate tables '%s' must be OLAP table"),
+    ERR_COLOCATE_TABLE_MUST_SAME_REPLICAT_NUM(5063, new byte[] { '4', '2', 
'0', '0', '0' },
+            "Colocate tables must have the same replication num: %s"),
+    ERR_COLOCATE_TABLE_MUST_SAME_BUCKNUM(5063, new byte[] { '4', '2', '0', 
'0', '0' },
+            "Colocate tables must have the same bucket num: %s"),
+    ERR_COLOCATE_TABLE_SAME_DISTRIBUTED_COLUMNS_SIZE(5063, new byte[] { '4', 
'2', '0', '0', '0' },
+            "Colocate table distribution columns size must be same : %s"),
+    ERR_COLOCATE_TABLE_SAME_DISTRIBUTED_COLUMNS_TYPE(5063, new byte[] { '4', 
'2', '0', '0', '0' },
+            "Colocate table distribution columns must have the same data type: 
%s should be %s");
 
 
     ErrorCode(int code, byte[] sqlState, String errorMsg) {
diff --git a/fe/src/main/java/org/apache/doris/common/FeConstants.java 
b/fe/src/main/java/org/apache/doris/common/FeConstants.java
index 03f078d6..06079ab7 100644
--- a/fe/src/main/java/org/apache/doris/common/FeConstants.java
+++ b/fe/src/main/java/org/apache/doris/common/FeConstants.java
@@ -35,5 +35,5 @@
 
     // general model
     // Current meta data version. Use this version to write journals and image
-    public static int meta_version = FeMetaVersion.VERSION_45;
+    public static int meta_version = FeMetaVersion.VERSION_46;
 }
diff --git a/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java 
b/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java
index f8be0a04..5a5e69ed 100644
--- a/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java
+++ b/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java
@@ -97,4 +97,7 @@
 
     // streaming load
     public static final int VERSION_45 = 45;
+
+    // colocate join
+    public static final int VERSION_46 = 46;
 }
diff --git 
a/fe/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java 
b/fe/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
index 87f3ec8b..ecb7b8f6 100644
--- a/fe/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
+++ b/fe/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
@@ -65,6 +65,8 @@
 
     public static final String PROPERTIES_COLUMN_SEPARATOR = 
"column_separator";
     public static final String PROPERTIES_LINE_DELIMITER = "line_delimiter";
+
+    public static final String PROPERTIES_COLOCATE_WITH = "colocate_with";
     
     public static DataProperty analyzeDataProperty(Map<String, String> 
properties, DataProperty oldDataProperty)
             throws AnalysisException {
@@ -341,4 +343,13 @@ public static String analyzeKuduMasterAddr(Map<String, 
String> properties, Strin
 
         return returnAddr;
     }
+
+    public static String analyzeColocate(Map<String, String> properties) 
throws AnalysisException {
+        String colocateTable = null;
+        if (properties != null && 
properties.containsKey(PROPERTIES_COLOCATE_WITH)) {
+            colocateTable = properties.get(PROPERTIES_COLOCATE_WITH);
+            properties.remove(PROPERTIES_COLOCATE_WITH);
+        }
+        return colocateTable;
+    }
 }
diff --git a/fe/src/main/java/org/apache/doris/journal/JournalEntity.java 
b/fe/src/main/java/org/apache/doris/journal/JournalEntity.java
index 70888ee2..1a304cd9 100644
--- a/fe/src/main/java/org/apache/doris/journal/JournalEntity.java
+++ b/fe/src/main/java/org/apache/doris/journal/JournalEntity.java
@@ -43,6 +43,7 @@
 import org.apache.doris.persist.BackendIdsUpdateInfo;
 import org.apache.doris.persist.CloneInfo;
 import org.apache.doris.persist.ClusterInfo;
+import org.apache.doris.persist.ColocatePersistInfo;
 import org.apache.doris.persist.ConsistencyCheckInfo;
 import org.apache.doris.persist.CreateTableInfo;
 import org.apache.doris.persist.DatabaseInfo;
@@ -57,6 +58,7 @@
 import org.apache.doris.persist.ReplicaPersistInfo;
 import org.apache.doris.persist.TableInfo;
 import org.apache.doris.persist.TruncateTableInfo;
+import org.apache.doris.persist.TablePropertyInfo;
 import org.apache.doris.qe.SessionVariable;
 import org.apache.doris.system.Backend;
 import org.apache.doris.system.Frontend;
@@ -367,11 +369,25 @@ public void readFields(DataInput in) throws IOException {
                 data = new Text();
                 break;
             }
+
             case OperationType.OP_TRUNCATE_TABLE: {
                 data = TruncateTableInfo.read(in);
                 needRead = false;
                 break;
             }
+
+            case OperationType.OP_COLOCATE_ADD_TABLE:
+            case OperationType.OP_COLOCATE_REMOVE_TABLE:
+            case OperationType.OP_COLOCATE_BACKENDS_PER_BUCKETSEQ:
+            case OperationType.OP_COLOCATE_MARK_BALANCING:
+            case OperationType.OP_COLOCATE_MARK_STABLE: {
+                data = new ColocatePersistInfo();
+                break;
+            }
+            case OperationType.OP_MODIFY_TABLE_COLOCATE: {
+                data = new TablePropertyInfo();
+                break;
+            }
             default: {
                 IOException e = new IOException();
                 LOG.error("UNKNOWN Operation Type {}", opCode, e);
diff --git a/fe/src/main/java/org/apache/doris/master/ReportHandler.java 
b/fe/src/main/java/org/apache/doris/master/ReportHandler.java
index d66be0cf..c2620771 100644
--- a/fe/src/main/java/org/apache/doris/master/ReportHandler.java
+++ b/fe/src/main/java/org/apache/doris/master/ReportHandler.java
@@ -692,6 +692,11 @@ private static void addReplica(long tabletId, TTabletInfo 
backendTabletInfo, lon
                 throw new MetaNotFoundException("table[" + tableId + "] does 
not exist");
             }
 
+            //colocate table will delete Replica in meta when balance
+            if (olapTable.getColocateTable() != null) {
+                return;
+            }
+
             Partition partition = olapTable.getPartition(partitionId);
             if (partition == null) {
                 throw new MetaNotFoundException("partition[" + partitionId + 
"] does not exist");
diff --git a/fe/src/main/java/org/apache/doris/persist/ColocatePersistInfo.java 
b/fe/src/main/java/org/apache/doris/persist/ColocatePersistInfo.java
new file mode 100644
index 00000000..2b49eb4c
--- /dev/null
+++ b/fe/src/main/java/org/apache/doris/persist/ColocatePersistInfo.java
@@ -0,0 +1,160 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.persist;
+
+import org.apache.doris.common.io.Writable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * PersistInfo for ColocateTableIndex
+ */
+public class ColocatePersistInfo implements Writable {
+    private long tableId;
+    private long groupId;
+    private long dbId;
+    private List<List<Long>> backendsPerBucketSeq;
+
+    public ColocatePersistInfo() {
+
+    }
+
+    public static ColocatePersistInfo CreateForAddTable(long tableId, long 
groupId, long dbId, List<List<Long>> backendsPerBucketSeq) {
+        return new ColocatePersistInfo(tableId, groupId, dbId, 
backendsPerBucketSeq);
+    }
+
+    public static ColocatePersistInfo CreateForBackendsPerBucketSeq(long 
groupId, List<List<Long>> backendsPerBucketSeq) {
+        return new ColocatePersistInfo(-1L, groupId, -1L, 
backendsPerBucketSeq);
+    }
+
+    public static ColocatePersistInfo CreateForMarkBalancing(long groupId) {
+        return new ColocatePersistInfo(-1L, groupId, -1L, new ArrayList<>());
+    }
+
+    public static ColocatePersistInfo CreateForMarkStable(long groupId) {
+        return new ColocatePersistInfo(-1L, groupId, -1L, new ArrayList<>());
+    }
+
+    public static ColocatePersistInfo CreateForRemoveTable(long tableId) {
+        return new ColocatePersistInfo(tableId, -1L, -1L, new ArrayList<>());
+    }
+
+    public ColocatePersistInfo(long tableId, long groupId, long dbId, 
List<List<Long>> backendsPerBucketSeq) {
+        this.tableId = tableId;
+        this.groupId = groupId;
+        this.dbId = dbId;
+        this.backendsPerBucketSeq = backendsPerBucketSeq;
+    }
+
+    public long getTableId() {
+        return tableId;
+    }
+
+    public void setTableId(long tableId) {
+        this.tableId = tableId;
+    }
+
+    public long getGroupId() {
+        return groupId;
+    }
+
+    public void setGroupId(long groupId) {
+        this.groupId = groupId;
+    }
+
+    public long getDbId() {
+        return dbId;
+    }
+
+    public void setDbId(long dbId) {
+        this.dbId = dbId;
+    }
+
+    public List<List<Long>> getBackendsPerBucketSeq() {
+        return backendsPerBucketSeq;
+    }
+
+    public void setBackendsPerBucketSeq(List<List<Long>> backendsPerBucketSeq) 
{
+        this.backendsPerBucketSeq = backendsPerBucketSeq;
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        out.writeLong(tableId);
+        out.writeLong(groupId);
+        out.writeLong(dbId);
+        int size = backendsPerBucketSeq.size();
+        out.writeInt(size);
+        for (List<Long> beList : backendsPerBucketSeq) {
+            out.writeInt(beList.size());
+            for (Long be : beList) {
+                out.writeLong(be);
+            }
+        }
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        tableId = in.readLong();
+        groupId = in.readLong();
+        dbId = in.readLong();
+
+        int size = in.readInt();
+        backendsPerBucketSeq = new ArrayList<>();
+        for (int i = 0; i < size; i++) {
+            int beListSize = in.readInt();
+            List<Long> beLists = new ArrayList<>();
+            for (int j = 0; j < beListSize; j++) {
+                beLists.add(in.readLong());
+            }
+            backendsPerBucketSeq.add(beLists);
+        }
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (obj == this) {
+            return true;
+        }
+
+        if (!(obj instanceof ColocatePersistInfo)) {
+            return false;
+        }
+
+        ColocatePersistInfo info = (ColocatePersistInfo) obj;
+
+        return tableId == info.tableId
+                && groupId == info.groupId
+                && dbId == info.dbId
+                && backendsPerBucketSeq.equals(info.backendsPerBucketSeq);
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("table id: ").append(tableId);
+        sb.append(" group id: ").append(groupId);
+        sb.append(" db id: ").append(dbId);
+        sb.append(" backendsPerBucketSeq: ").append(backendsPerBucketSeq);
+        return sb.toString();
+    }
+}
diff --git a/fe/src/main/java/org/apache/doris/persist/EditLog.java 
b/fe/src/main/java/org/apache/doris/persist/EditLog.java
index ac48b1ba..42dadf9e 100644
--- a/fe/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/src/main/java/org/apache/doris/persist/EditLog.java
@@ -601,11 +601,43 @@ public static void loadJournal(Catalog catalog, 
JournalEntity journal) {
                     
catalog.getBackupHandler().getRepoMgr().removeRepo(repoName, true);
                     break;
                 }
+
                 case OperationType.OP_TRUNCATE_TABLE: {
                     TruncateTableInfo info = (TruncateTableInfo) 
journal.getData();
                     catalog.replayTruncateTable(info);
                     break;
                 }
+
+                case OperationType.OP_COLOCATE_ADD_TABLE: {
+                    final ColocatePersistInfo info = (ColocatePersistInfo) 
journal.getData();
+                    
catalog.getColocateTableIndex().replayAddTableToGroup(info);
+                    break;
+                }
+                case OperationType.OP_COLOCATE_REMOVE_TABLE: {
+                    final ColocatePersistInfo info = (ColocatePersistInfo) 
journal.getData();
+                    catalog.getColocateTableIndex().replayRemoveTable(info);
+                    break;
+                }
+                case OperationType.OP_COLOCATE_BACKENDS_PER_BUCKETSEQ: {
+                    final ColocatePersistInfo info = (ColocatePersistInfo) 
journal.getData();
+                    
catalog.getColocateTableIndex().replayAddBackendsPerBucketSeq(info);
+                    break;
+                }
+                case OperationType.OP_COLOCATE_MARK_BALANCING: {
+                    final ColocatePersistInfo info = (ColocatePersistInfo) 
journal.getData();
+                    
catalog.getColocateTableIndex().replayMarkGroupBalancing(info);
+                    break;
+                }
+                case OperationType.OP_COLOCATE_MARK_STABLE: {
+                    final ColocatePersistInfo info = (ColocatePersistInfo) 
journal.getData();
+                    
catalog.getColocateTableIndex().replayMarkGroupStable(info);
+                    break;
+                }
+                case OperationType.OP_MODIFY_TABLE_COLOCATE: {
+                    final TablePropertyInfo info = (TablePropertyInfo) 
journal.getData();
+                    catalog.replayModifyTableColocate(info);
+                }
+
                 default: {
                     IOException e = new IOException();
                     LOG.error("UNKNOWN Operation Type {}", opCode, e);
@@ -1064,4 +1096,28 @@ public void logUpdateUserProperty(UserPropertyInfo 
propertyInfo) {
     public void logTruncateTable(TruncateTableInfo info) {
         logEdit(OperationType.OP_TRUNCATE_TABLE, info);
     }
+
+    public void logColocateAddTable(ColocatePersistInfo info) {
+        logEdit(OperationType.OP_COLOCATE_ADD_TABLE, info);
+    }
+
+    public void logColocateRemoveTable(ColocatePersistInfo info) {
+        logEdit(OperationType.OP_COLOCATE_REMOVE_TABLE, info);
+    }
+
+    public void logColocateBackendsPerBucketSeq(ColocatePersistInfo info) {
+        logEdit(OperationType.OP_COLOCATE_BACKENDS_PER_BUCKETSEQ, info);
+    }
+
+    public void logColocateMarkBalancing(ColocatePersistInfo info) {
+        logEdit(OperationType.OP_COLOCATE_MARK_BALANCING, info);
+    }
+
+    public void logColocateMarkStable(ColocatePersistInfo info) {
+        logEdit(OperationType.OP_COLOCATE_MARK_STABLE, info);
+    }
+
+    public void logModifyTableColocate(TablePropertyInfo info) {
+        logEdit(OperationType.OP_MODIFY_TABLE_COLOCATE, info);
+    }
 }
diff --git a/fe/src/main/java/org/apache/doris/persist/OperationType.java 
b/fe/src/main/java/org/apache/doris/persist/OperationType.java
index 6fdad60b..6924b6dc 100644
--- a/fe/src/main/java/org/apache/doris/persist/OperationType.java
+++ b/fe/src/main/java/org/apache/doris/persist/OperationType.java
@@ -132,6 +132,14 @@
     public static final short OP_CREATE_REPOSITORY = 89;
     public static final short OP_DROP_REPOSITORY = 90;
 
+    //colocate table
+    public static final short OP_COLOCATE_ADD_TABLE = 94;
+    public static final short OP_COLOCATE_REMOVE_TABLE = 95;
+    public static final short OP_COLOCATE_BACKENDS_PER_BUCKETSEQ = 96;
+    public static final short OP_COLOCATE_MARK_BALANCING = 97;
+    public static final short OP_COLOCATE_MARK_STABLE = 98;
+    public static final short OP_MODIFY_TABLE_COLOCATE = 99;
+
     //real time load 100 -108
     public static final short OP_UPSERT_TRANSACTION_STATE = 100;
     public static final short OP_DELETE_TRANSACTION_STATE = 101;
diff --git a/fe/src/main/java/org/apache/doris/persist/TablePropertyInfo.java 
b/fe/src/main/java/org/apache/doris/persist/TablePropertyInfo.java
new file mode 100644
index 00000000..fdb80929
--- /dev/null
+++ b/fe/src/main/java/org/apache/doris/persist/TablePropertyInfo.java
@@ -0,0 +1,120 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.persist;
+
+import org.apache.doris.common.io.Text;
+import org.apache.doris.common.io.Writable;
+import com.google.common.collect.Maps;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * PersistInfo for Table properties
+ */
+public class TablePropertyInfo implements Writable {
+    private long dbId;
+    private long tableId;
+    private Map<String, String> propertyMap;
+
+    public TablePropertyInfo() {
+
+    }
+
+    public TablePropertyInfo(long dbId, long tableId, Map<String, String> 
propertyMap) {
+        this.dbId = dbId;
+        this.tableId = tableId;
+        this.propertyMap = propertyMap;
+    }
+
+    public Map<String, String> getPropertyMap() {
+        return propertyMap;
+    }
+
+    public void setPropertyMap(Map<String, String> propertyMap) {
+        this.propertyMap = propertyMap;
+    }
+
+    public long getDbId() {
+        return dbId;
+    }
+
+    public void setDbId(long dbId) {
+        this.dbId = dbId;
+    }
+
+    public long getTableId() {
+        return tableId;
+    }
+
+    public void setTableId(long tableId) {
+        this.tableId = tableId;
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        out.writeLong(dbId);
+        out.writeLong(tableId);
+        int size = propertyMap.size();
+        out.writeInt(size);
+        for (Map.Entry<String, String> kv : propertyMap.entrySet()) {
+            Text.writeString(out, kv.getKey());
+            Text.writeString(out, kv.getValue());
+        }
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        dbId = in.readLong();
+        tableId = in.readLong();
+
+        int size = in.readInt();
+        propertyMap = Maps.newHashMap();
+        for (int i = 0; i < size; i++) {
+            String key = Text.readString(in);
+            String value = Text.readString(in);
+            propertyMap.put(key, value);
+        }
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (obj == this) {
+            return true;
+        }
+
+        if (!(obj instanceof TablePropertyInfo)) {
+            return false;
+        }
+
+        TablePropertyInfo info = (TablePropertyInfo) obj;
+
+        return dbId == info.dbId && tableId == info.tableId && 
propertyMap.equals(info.propertyMap);
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("db id: ").append(dbId);
+        sb.append(" table id: ").append(tableId);
+        sb.append(" propertyMap: ").append(propertyMap);
+        return sb.toString();
+    }
+}
diff --git a/fe/src/main/java/org/apache/doris/planner/DistributedPlanner.java 
b/fe/src/main/java/org/apache/doris/planner/DistributedPlanner.java
index c8844a0b..be5617da 100644
--- a/fe/src/main/java/org/apache/doris/planner/DistributedPlanner.java
+++ b/fe/src/main/java/org/apache/doris/planner/DistributedPlanner.java
@@ -22,10 +22,19 @@
 import org.apache.doris.analysis.InsertStmt;
 import org.apache.doris.analysis.JoinOperator;
 import org.apache.doris.analysis.QueryStmt;
+import org.apache.doris.analysis.SlotDescriptor;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.ColocateTableIndex;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.DistributionInfo;
+import org.apache.doris.catalog.HashDistributionInfo;
+import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.Table;
 import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Config;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.Pair;
+import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.thrift.TPartitionType;
 
 import com.google.common.base.Preconditions;
@@ -188,7 +197,7 @@ private PlanFragment createPlanFragments(
         } else if (root instanceof HashJoinNode) {
             Preconditions.checkState(childFragments.size() == 2);
             result = createHashJoinFragment((HashJoinNode) root, 
childFragments.get(1),
-                    childFragments.get(0), perNodeMemLimit);
+                    childFragments.get(0), perNodeMemLimit, fragments);
         } else if (root instanceof CrossJoinNode) {
             result = createCrossJoinFragment((CrossJoinNode) root, 
childFragments.get(1),
                     childFragments.get(0));
@@ -274,7 +283,8 @@ private PlanFragment createScanFragment(PlanNode node) {
      * don't create a broadcast join if we already anticipate that this will 
exceed the query's memory budget.
      */
     private PlanFragment createHashJoinFragment(HashJoinNode node, 
PlanFragment rightChildFragment,
-                                                PlanFragment 
leftChildFragment, long perNodeMemLimit)
+                                                PlanFragment 
leftChildFragment, long perNodeMemLimit,
+                                                ArrayList<PlanFragment> 
fragments)
             throws UserException {
         // broadcast: send the rightChildFragment's output to each node 
executing
         // the leftChildFragment; the cost across all nodes is proportional to 
the
@@ -331,6 +341,16 @@ private PlanFragment createHashJoinFragment(HashJoinNode 
node, PlanFragment righ
             doBroadcast = false;
         }
 
+        if (canColocateJoin(node, leftChildFragment, rightChildFragment)) {
+            node.setColocate(true);
+            
//node.setDistributionMode(HashJoinNode.DistributionMode.PARTITIONED);
+            node.setChild(0, leftChildFragment.getPlanRoot());
+            node.setChild(1, rightChildFragment.getPlanRoot());
+            leftChildFragment.setPlanRoot(node);
+            fragments.remove(rightChildFragment);
+            return leftChildFragment;
+        }
+
         if (doBroadcast) {
             node.setDistributionMode(HashJoinNode.DistributionMode.BROADCAST);
             // Doesn't create a new fragment, but modifies leftChildFragment 
to execute
@@ -395,6 +415,86 @@ private PlanFragment createHashJoinFragment(HashJoinNode 
node, PlanFragment righ
         }
     }
 
+    private boolean canColocateJoin(HashJoinNode node, PlanFragment 
leftChildFragment, PlanFragment rightChildFragment) {
+        if (Config.disable_colocate_join) {
+            return false;
+        }
+
+        if (ConnectContext.get().getSessionVariable().isDisableColocateJoin()) 
{
+            return false;
+        }
+
+        PlanNode leftRoot = leftChildFragment.getPlanRoot();
+        PlanNode rightRoot = rightChildFragment.getPlanRoot();
+
+        //leftRoot should be ScanNode or HashJoinNode, rightRoot should be 
ScanNode
+        if (leftRoot instanceof OlapScanNode && rightRoot instanceof 
OlapScanNode) {
+            return canColocateJoin(node, leftRoot, rightRoot);
+        }
+
+        if (leftRoot instanceof HashJoinNode && rightRoot instanceof 
OlapScanNode) {
+            while (leftRoot instanceof HashJoinNode) {
+                if (((HashJoinNode)leftRoot).isColocate()) {
+                    leftRoot = leftRoot.getChild(0);
+                } else {
+                    return false;
+                }
+            }
+            if (leftRoot instanceof OlapScanNode) {
+                return canColocateJoin(node, leftRoot, rightRoot);
+            }
+        }
+
+        return false;
+    }
+
+    //the table must be colocate
+    //the colocate group must be stable
+    //the eqJoinConjuncts must contain the distributionColumns
+    private boolean canColocateJoin(HashJoinNode node, PlanNode leftRoot, 
PlanNode rightRoot) {
+        OlapTable leftTable = ((OlapScanNode) leftRoot).getOlapTable();
+        OlapTable rightTable = ((OlapScanNode) rightRoot).getOlapTable();
+
+        //1 the table must be colocate
+        if (leftTable.getColocateTable() == null
+                || 
!leftTable.getColocateTable().equalsIgnoreCase(rightTable.getColocateTable())) {
+            return false;
+        }
+
+        //2 the colocate group must be stable
+        ColocateTableIndex colocateIndex = Catalog.getCurrentColocateIndex();
+        long groupId = colocateIndex.getGroup(leftTable.getId());
+        if (colocateIndex.isGroupBalancing(groupId)) {
+            LOG.warn("colocate group {} is balancing", 
leftTable.getColocateTable());
+            return false;
+        }
+
+        DistributionInfo leftDistribution = 
leftTable.getDefaultDistributionInfo();
+        DistributionInfo rightDistribution = 
rightTable.getDefaultDistributionInfo();
+
+        if (leftDistribution instanceof HashDistributionInfo && 
rightDistribution instanceof HashDistributionInfo) {
+            List<Column> leftColumns = ((HashDistributionInfo) 
leftDistribution).getDistributionColumns();
+            List<Column> rightColumns = ((HashDistributionInfo) 
rightDistribution).getDistributionColumns();
+
+            List<Pair<Expr, Expr>> eqJoinConjuncts = node.getEqJoinConjuncts();
+            for (Pair<Expr, Expr> eqJoinPredicate : eqJoinConjuncts) {
+                if (eqJoinPredicate.first.unwrapSlotRef() == null || 
eqJoinPredicate.second.unwrapSlotRef() == null) {
+                    continue;
+                }
+
+                SlotDescriptor leftSlot = 
eqJoinPredicate.first.unwrapSlotRef().getDesc();
+                SlotDescriptor rightSlot = 
eqJoinPredicate.second.unwrapSlotRef().getDesc();
+
+                //3 the eqJoinConjuncts must contain the distributionColumns
+                if (leftColumns.contains(leftSlot.getColumn()) && 
rightColumns.contains(rightSlot.getColumn())) {
+                    return true;
+                }
+            }
+        }
+
+        return false;
+    }
+
     /**
      * Modifies the leftChildFragment to execute a cross join. The right child 
input is provided by an ExchangeNode,
      * which is the destination of the rightChildFragment's output.
diff --git a/fe/src/main/java/org/apache/doris/planner/HashJoinNode.java 
b/fe/src/main/java/org/apache/doris/planner/HashJoinNode.java
index 31681866..fa976c55 100644
--- a/fe/src/main/java/org/apache/doris/planner/HashJoinNode.java
+++ b/fe/src/main/java/org/apache/doris/planner/HashJoinNode.java
@@ -58,6 +58,7 @@
     private  List<Expr> otherJoinConjuncts;
     private boolean isPushDown;
     private DistributionMode distrMode;
+    private boolean isColocate = false; //the flag for colocate join
 
     public HashJoinNode(PlanNodeId id, PlanNode outer, PlanNode inner, 
TableRef innerRef,
                         List<Pair<Expr, Expr>> eqJoinConjuncts, List<Expr> 
otherJoinConjuncts) {
@@ -111,6 +112,14 @@ public void setDistributionMode(DistributionMode 
distrMode) {
         this.distrMode = distrMode;
     }
 
+    public boolean isColocate() {
+        return isColocate;
+    }
+
+    public void setColocate(boolean colocate) {
+        isColocate = colocate;
+    }
+
     @Override
     public void init(Analyzer analyzer) throws UserException {
         assignConjuncts(analyzer);
diff --git a/fe/src/main/java/org/apache/doris/planner/OlapScanNode.java 
b/fe/src/main/java/org/apache/doris/planner/OlapScanNode.java
index 9ff17926..24345132 100644
--- a/fe/src/main/java/org/apache/doris/planner/OlapScanNode.java
+++ b/fe/src/main/java/org/apache/doris/planner/OlapScanNode.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.planner;
 
+import com.google.common.collect.ArrayListMultimap;
 import org.apache.doris.analysis.Analyzer;
 import org.apache.doris.analysis.BaseTableRef;
 import org.apache.doris.analysis.Expr;
@@ -99,6 +100,9 @@
 
     private HashSet<Long> scanBackendIds = new HashSet<>();
 
+    private Map<Long, Integer> tabletId2BucketSeq = Maps.newHashMap();
+    public ArrayListMultimap<Integer, TScanRangeLocations> 
bucketSeq2locations= ArrayListMultimap.create();
+
     /**
      * Constructs node to scan given data files of table 'tbl'.
      */
@@ -125,6 +129,10 @@ public void setCanTurnOnPreAggr(boolean canChangePreAggr) {
         this.canTurnOnPreAggr = canChangePreAggr;
     }
 
+    public OlapTable getOlapTable() {
+        return olapTable;
+    }
+
     @Override
     protected String debugString() {
         ToStringHelper helper = Objects.toStringHelper(this);
@@ -479,6 +487,9 @@ private void addScanRangeLocations(Partition partition,
             TScanRange scanRange = new TScanRange();
             scanRange.setPalo_scan_range(paloRange);
             scanRangeLocations.setScan_range(scanRange);
+
+            bucketSeq2locations.put(tabletId2BucketSeq.get(tabletId), 
scanRangeLocations);
+
             result.add(scanRangeLocations);
         }
     }
@@ -565,6 +576,8 @@ private void getScanRangeLocations(Analyzer analyzer) 
throws UserException, Anal
             List<Tablet> tablets = new ArrayList<Tablet>();
             Collection<Long> tabletIds = distributionPrune(selectedTable, 
partition.getDistributionInfo());
             LOG.debug("distribution prune tablets: {}", tabletIds);
+
+            List<Long> allTabletIds = selectedTable.getTabletIdsInOrder();
             if (tabletIds != null) {
                 for (Long id : tabletIds) {
                     tablets.add(selectedTable.getTablet(id));
@@ -572,6 +585,11 @@ private void getScanRangeLocations(Analyzer analyzer) 
throws UserException, Anal
             } else {
                 tablets.addAll(selectedTable.getTablets());
             }
+
+            for (int i = 0; i < allTabletIds.size(); i++) {
+                tabletId2BucketSeq.put(allTabletIds.get(i), i);
+            }
+
             totalTabletsNum += selectedTable.getTablets().size();
             selectedTabletsNum += tablets.size();
             addScanRangeLocations(partition, selectedTable, tablets, 
localBeId);
diff --git a/fe/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/src/main/java/org/apache/doris/qe/Coordinator.java
index e1a37add..cfbe7b93 100644
--- a/fe/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -31,6 +31,7 @@
 import org.apache.doris.planner.DataSink;
 import org.apache.doris.planner.DataStreamSink;
 import org.apache.doris.planner.ExchangeNode;
+import org.apache.doris.planner.HashJoinNode;
 import org.apache.doris.planner.MysqlScanNode;
 import org.apache.doris.planner.OlapScanNode;
 import org.apache.doris.planner.PlanFragment;
@@ -87,9 +88,11 @@
 import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
@@ -393,6 +396,7 @@ public void exec() throws Exception {
             // execute all instances from up to bottom
             int backendId = 0;
             int profileFragmentId = 0;
+            long memoryLimit = queryOptions.getMem_limit();
             for (PlanFragment fragment : fragments) {
                 FragmentExecParams params = 
fragmentExecParamsMap.get(fragment.getFragmentId());
                 
@@ -401,6 +405,17 @@ public void exec() throws Exception {
                 Preconditions.checkState(instanceNum > 0);
                 List<TExecPlanFragmentParams> tParams = 
params.toThrift(backendId);
                 List<Pair<BackendExecState, Future<PExecPlanFragmentResult>>> 
futures = Lists.newArrayList();
+
+                //update memory limit for colocate join
+                if 
(colocateFragmentIds.contains(fragment.getFragmentId().asInt())) {
+                    int rate = 
Math.min(Config.query_colocate_join_memory_limit_penalty_factor, instanceNum);
+                    long newmemory = memoryLimit / rate;
+
+                    for (TExecPlanFragmentParams tParam : tParams) {
+                        tParam.query_options.setMem_limit(newmemory);
+                    }
+                }
+
                 int instanceId = 0;
                 for (TExecPlanFragmentParams tParam : tParams) {
                     // TODO: pool of pre-formatted BackendExecStates?
@@ -821,19 +836,31 @@ private void computeFragmentHosts() throws Exception {
                 continue;
             }
 
-            Iterator iter = fragmentExecParamsMap.get(fragment.getFragmentId())
-                          .scanRangeAssignment.entrySet().iterator();
-            while (iter.hasNext()) {
-                Map.Entry entry = (Map.Entry) iter.next();
-                TNetworkAddress key = (TNetworkAddress) entry.getKey();
-                Map<Integer, List<TScanRangeParams>> value = 
-                        (Map<Integer, List<TScanRangeParams>>) 
entry.getValue();
-                FInstanceExecParam instanceParam = new 
FInstanceExecParam(null, key, 
-                        0, params);
-                for (Integer planNodeId : value.keySet()) {
-                    instanceParam.perNodeScanRanges.put(planNodeId, 
value.get(planNodeId));
+            //for ColocateJoin fragment
+            if (bucketSeqToAddress.size() > 0 && 
isColocateJoin(fragment.getPlanRoot())) {
+                for (Map.Entry<Integer, Map<Integer, List<TScanRangeParams>>> 
scanRanges : bucketSeqToScanRange.entrySet()) {
+                    FInstanceExecParam instanceParam = new 
FInstanceExecParam(null, bucketSeqToAddress.get(scanRanges.getKey()), 0, 
params);
+
+                    Map<Integer, List<TScanRangeParams>> nodeScanRanges = 
scanRanges.getValue();
+                    for (Map.Entry<Integer, List<TScanRangeParams>> 
nodeScanRange : nodeScanRanges.entrySet()) {
+                        
instanceParam.perNodeScanRanges.put(nodeScanRange.getKey(), 
nodeScanRange.getValue());
+                    }
+
+                    params.instanceExecParams.add(instanceParam);
+                }
+            } else {
+                //normat fragment
+                Iterator iter = 
fragmentExecParamsMap.get(fragment.getFragmentId()).scanRangeAssignment.entrySet().iterator();
+                while (iter.hasNext()) {
+                    Map.Entry entry = (Map.Entry) iter.next();
+                    TNetworkAddress key = (TNetworkAddress) entry.getKey();
+                    Map<Integer, List<TScanRangeParams>> value = (Map<Integer, 
List<TScanRangeParams>>) entry.getValue();
+                    FInstanceExecParam instanceParam = new 
FInstanceExecParam(null, key, 0, params);
+                    for (Integer planNodeId : value.keySet()) {
+                        instanceParam.perNodeScanRanges.put(planNodeId, 
value.get(planNodeId));
+                    }
+                    params.instanceExecParams.add(instanceParam);
                 }
-                params.instanceExecParams.add(instanceParam);
             }
 
             if (params.instanceExecParams.isEmpty()) {
@@ -849,6 +876,36 @@ private void computeFragmentHosts() throws Exception {
             }
         }
     }
+
+    //One fragment could only have one HashJoinNode
+    private boolean isColocateJoin(PlanNode node) {
+        if (Config.disable_colocate_join) {
+            return false;
+        }
+
+        if (ConnectContext.get().getSessionVariable().isDisableColocateJoin()) 
{
+            return false;
+        }
+
+        //cache the colocateFragmentIds
+        if (colocateFragmentIds.contains(node.getFragmentId().asInt())) {
+            return true;
+        }
+
+        if (node instanceof HashJoinNode) {
+            HashJoinNode joinNode = (HashJoinNode) node;
+            if (joinNode.isColocate()) {
+                colocateFragmentIds.add(joinNode.getFragmentId().asInt());
+                return true;
+            }
+        }
+
+        for (PlanNode childNode : node.getChildren()) {
+            return isColocateJoin(childNode);
+        }
+
+        return false;
+    }
     
     private void computeFragmentExecParamsForParallelExec() throws Exception {
         // create exec params and set instance_id, host, per_node_scan_ranges
@@ -1188,17 +1245,58 @@ private void computeScanRangeAssignment() throws 
Exception {
 
             FragmentScanRangeAssignment assignment =
                     
fragmentExecParamsMap.get(scanNode.getFragmentId()).scanRangeAssignment;
-            computeScanRangeAssignment(scanNode.getId(), locations, 
assignment);
+            if (isColocateJoin(scanNode.getFragment().getPlanRoot())) {
+                computeScanRangeAssignmentByColocate((OlapScanNode) scanNode, 
assignment);
+            } else {
+                computeScanRangeAssignmentByScheduler(scanNode, locations, 
assignment);
+            }
         }
     }
 
-    // Does a scan range assignment (returned in 'assignment') based on a list
-    // of scan range locations for a particular node.
-    // If exec_at_coord is true, all scan ranges will be assigned to the coord 
node.
-    private void computeScanRangeAssignment(
-            final PlanNodeId nodeId,
+    //To ensure the same bucketSeq tablet to the same execHostPort
+    private void computeScanRangeAssignmentByColocate(
+            final OlapScanNode scanNode,
+            FragmentScanRangeAssignment assignment) throws Exception {
+
+        for(Integer bucketSeq: scanNode.bucketSeq2locations.keySet()) {
+            //fill scanRangeParamsList
+            List<TScanRangeLocations> locations = 
scanNode.bucketSeq2locations.get(bucketSeq);
+            if (!bucketSeqToAddress.containsKey(bucketSeq)) {
+                getExecHostPortForBucketSeq(locations.get(0), bucketSeq);
+            }
+
+            for(TScanRangeLocations location: locations) {
+                Map<Integer, List<TScanRangeParams>> scanRanges =
+                        findOrInsert(bucketSeqToScanRange, bucketSeq, new 
HashMap<Integer, List<TScanRangeParams>>());
+
+                List<TScanRangeParams> scanRangeParamsList =
+                        findOrInsert(scanRanges, scanNode.getId().asInt(), new 
ArrayList<TScanRangeParams>());
+
+                // add scan range
+                TScanRangeParams scanRangeParams = new TScanRangeParams();
+                scanRangeParams.scan_range = location.scan_range;
+                scanRangeParamsList.add(scanRangeParams);
+            }
+
+        }
+    }
+
+    private void getExecHostPortForBucketSeq(TScanRangeLocations seqLocation, 
Integer bucketSeq) throws Exception {
+        int randomLocation = new 
Random().nextInt(seqLocation.locations.size());
+        Reference<Long> backendIdRef = new Reference<Long>();
+        TNetworkAddress execHostPort = 
SimpleScheduler.getHost(seqLocation.locations.get(randomLocation).backend_id, 
seqLocation.locations, this.idToBackend, backendIdRef);
+        if (execHostPort == null) {
+            throw new UserException("there is no scanNode Backend");
+        }
+        this.addressToBackendID.put(execHostPort, backendIdRef.getRef());
+        this.bucketSeqToAddress.put(bucketSeq, execHostPort);
+    }
+
+    private void computeScanRangeAssignmentByScheduler(
+            final ScanNode scanNode,
             final List<TScanRangeLocations> locations,
             FragmentScanRangeAssignment assignment) throws Exception {
+
         HashMap<TNetworkAddress, Long> assignedBytesPerHost = 
Maps.newHashMap();
         for (TScanRangeLocations scanRangeLocations : locations) {
             // assign this scan range to the host w/ the fewest assigned bytes
@@ -1224,9 +1322,9 @@ private void computeScanRangeAssignment(
             this.addressToBackendID.put(execHostPort, backendIdRef.getRef());
 
             Map<Integer, List<TScanRangeParams>> scanRanges = 
findOrInsert(assignment, execHostPort,
-                new HashMap<Integer, List<TScanRangeParams>>());
-            List<TScanRangeParams> scanRangeParamsList =
-                findOrInsert(scanRanges, nodeId.asInt(), new 
ArrayList<TScanRangeParams>());
+                    new HashMap<Integer, List<TScanRangeParams>>());
+            List<TScanRangeParams> scanRangeParamsList = 
findOrInsert(scanRanges, scanNode.getId().asInt(),
+                    new ArrayList<TScanRangeParams>());
             // add scan range
             TScanRangeParams scanRangeParams = new TScanRangeParams();
             scanRangeParams.scan_range = scanRangeLocations.scan_range;
@@ -1353,6 +1451,15 @@ public boolean isDone() {
             extends HashMap<TNetworkAddress, Map<Integer, 
List<TScanRangeParams>>> {
     }
 
+    class BucketSeqToScanRange extends HashMap<Integer, Map<Integer, 
List<TScanRangeParams>>> {
+
+    }
+
+    private BucketSeqToScanRange bucketSeqToScanRange = new 
BucketSeqToScanRange();
+    private Map<Integer, TNetworkAddress> bucketSeqToAddress = 
Maps.newHashMap();
+    private Set<Integer> colocateFragmentIds = new HashSet<>();
+
+
     // record backend execute state
     // TODO(zhaochun): add profile information and others
     public class BackendExecState {
diff --git a/fe/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/src/main/java/org/apache/doris/qe/SessionVariable.java
index 15e83cd1..1116b727 100644
--- a/fe/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -66,6 +66,7 @@
     public static final int MIN_EXEC_MEM_LIMIT = 2097152;   
     public static final String BATCH_SIZE = "batch_size";
     public static final String DISABLE_STREAMING_PREAGGREGATIONS = 
"disable_streaming_preaggregations";
+    public static final String DISABLE_COLOCATE_JOIN = "disable_colocate_join";
     public static final String MT_DOP = "mt_dop";
 
     // max memory used on every backend.
@@ -172,7 +173,10 @@
     private int batchSize = 1024;
 
     @VariableMgr.VarAttr(name = DISABLE_STREAMING_PREAGGREGATIONS)
-    private boolean disableStreamPreaggregations = false; 
+    private boolean disableStreamPreaggregations = false;
+
+    @VariableMgr.VarAttr(name = DISABLE_COLOCATE_JOIN)
+    private boolean disableColocateJoin = false;
 
     public long getMaxExecMemByte() {
         return maxExecMemByte;
@@ -405,6 +409,14 @@ public int getMtDop() {
     public void setMtDop(int mtDop) {
         this.mtDop = mtDop;
     }
+
+    public boolean isDisableColocateJoin() {
+        return disableColocateJoin;
+    }
+
+    public void setDisableColocateJoin(boolean disableColocateJoin) {
+        this.disableColocateJoin = disableColocateJoin;
+    }
     
    // Serialize to thrift object 
     TQueryOptions toThrift() {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to