This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new ab11dea98d [Enhancement](config) optimize behavior of
default_storage_medium (#20739)
ab11dea98d is described below
commit ab11dea98d5b9d2ad02a97dd7c833819b19bdf2d
Author: Shiyuan Ji <[email protected]>
AuthorDate: Thu Jul 20 22:00:11 2023 +0800
[Enhancement](config) optimize behavior of default_storage_medium (#20739)
---
docs/en/docs/faq/install-faq.md | 2 +-
docs/zh-CN/docs/faq/install-faq.md | 2 +-
.../java/org/apache/doris/backup/RestoreJob.java | 2 +-
.../org/apache/doris/catalog/DataProperty.java | 10 +++
.../java/org/apache/doris/catalog/OlapTable.java | 2 +-
.../doris/common/util/DynamicPartitionUtil.java | 7 +-
.../apache/doris/common/util/PropertyAnalyzer.java | 20 +++--
.../apache/doris/datasource/InternalCatalog.java | 90 ++++++++++++----------
.../org/apache/doris/system/SystemInfoService.java | 13 +++-
.../org/apache/doris/backup/RestoreJobTest.java | 3 +-
.../CreateTableElasticOnStorageMediumTest.java | 83 ++++++++++++++++++++
.../apache/doris/catalog/ModifyBackendTest.java | 2 +-
.../datasource/RoundRobinCreateTabletTest.java | 2 +-
.../doris/load/sync/canal/CanalSyncDataTest.java | 2 +-
.../apache/doris/system/SystemInfoServiceTest.java | 2 +-
15 files changed, 180 insertions(+), 62 deletions(-)
diff --git a/docs/en/docs/faq/install-faq.md b/docs/en/docs/faq/install-faq.md
index af78496c78..81a816c135 100644
--- a/docs/en/docs/faq/install-faq.md
+++ b/docs/en/docs/faq/install-faq.md
@@ -163,7 +163,7 @@ In many cases, we need to troubleshoot problems through
logs. The format and vie
Doris supports one BE node to configure multiple storage paths. Usually, one
storage path can be configured for each disk. At the same time, Doris supports
storage media properties that specify paths, such as SSD or HDD. SSD stands for
high-speed storage device and HDD stands for low-speed storage device.
-If doris cluster has only one storage medium type, the practice is not specify
storage medium in be.conf configuration file. ```Failed to find enough host
with storage medium and tag```, generally we got this error for only config SSD
medium in be.conf, but default parameter ```default_storage_medium``` in fe is
HDD, so there is no HDD storage medium in cluster. There are several ways to
fix this, one is modify the parameter in fe.conf and restart fe; the other way
is take the SSD config [...]
+If the cluster only has one type of medium, such as all HDD or all SSD, the
best practice is not to explicitly specify the medium property in be.conf. If
encountering the error ```Failed to find enough host with storage medium and
tag``` mentioned above, it is generally because be.conf only configures the SSD
medium, while the table creation stage explicitly specifies ```properties
{"storage_medium" = "hdd"}```; similarly, if be.conf only configures the HDD
medium, and the table creation [...]
By specifying the storage medium properties of the path, we can take advantage
of Doris's hot and cold data partition storage function to store hot data in
SSD at the partition level, while cold data is automatically transferred to HDD.
diff --git a/docs/zh-CN/docs/faq/install-faq.md
b/docs/zh-CN/docs/faq/install-faq.md
index 4dca37ae39..d9d2c78e42 100644
--- a/docs/zh-CN/docs/faq/install-faq.md
+++ b/docs/zh-CN/docs/faq/install-faq.md
@@ -163,7 +163,7 @@ Observer 角色和这个单词的含义一样,仅仅作为观察者来同步
Doris支持一个BE节点配置多个存储路径。通常情况下,每块盘配置一个存储路径即可。同时,Doris支持指定路径的存储介质属性,如SSD或HDD。SSD代表高速存储设备,HDD代表低速存储设备。
-如果集群只有一种介质比如都是HDD或者都是SSD,最佳实践是不用在be.conf中显式指定介质属性。如果遇到上述报错```Failed to find
enough host with storage medium and
tag```,一般是因为be.conf中只配置了SSD的介质,而fe中参数default_storage_medium默认为HDD,因此建表时会发现没有HDD介质的存储而报错。解决方案可以修改此FE配置并重启FE生效;或者将be.conf中SSD的显式配置去掉;或者建表时增加properties参数
```properties {"storage_medium" = "ssd"}```均可
+如果集群只有一种介质比如都是HDD或者都是SSD,最佳实践是不用在be.conf中显式指定介质属性。如果遇到上述报错```Failed to find
enough host with storage medium and
tag```,一般是因为be.conf中只配置了SSD的介质,而建表阶段中显式指定了```properties {"storage_medium" =
"hdd"}```;同理如果be.conf只配置了HDD的介质,而而建表阶段中显式指定了```properties {"storage_medium" =
"ssd"}```也会出现上述错误。解决方案可以修改建表的properties参数与配置匹配;或者将be.conf中SSD/HDD的显式配置去掉即可。
通过指定路径的存储介质属性,我们可以利用Doris的冷热数据分区存储功能,在分区级别将热数据存储在SSD中,而冷数据会自动转移到HDD中。
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
index a0237f03d8..6bd606a378 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
@@ -1091,7 +1091,7 @@ public class RestoreJob extends AbstractJob {
// replicas
try {
Map<Tag, List<Long>> beIds = Env.getCurrentSystemInfo()
- .selectBackendIdsForReplicaCreation(replicaAlloc,
null);
+ .selectBackendIdsForReplicaCreation(replicaAlloc,
null, false, false);
for (Map.Entry<Tag, List<Long>> entry : beIds.entrySet()) {
for (Long beId : entry.getValue()) {
long newReplicaId = env.getNextId();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/DataProperty.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/DataProperty.java
index 6137af9424..7028e5e449 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/DataProperty.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/DataProperty.java
@@ -49,6 +49,7 @@ public class DataProperty implements Writable,
GsonPostProcessable {
private String storagePolicy;
@SerializedName(value = "isMutable")
private boolean isMutable = true;
+ private boolean storageMediumSpecified;
private DataProperty() {
// for persist
@@ -97,6 +98,10 @@ public class DataProperty implements Writable,
GsonPostProcessable {
return storagePolicy;
}
+ public boolean isStorageMediumSpecified() {
+ return storageMediumSpecified;
+ }
+
public boolean isMutable() {
return isMutable;
}
@@ -105,6 +110,10 @@ public class DataProperty implements Writable,
GsonPostProcessable {
isMutable = mutable;
}
+ public void setStorageMediumSpecified(boolean isSpecified) {
+ storageMediumSpecified = isSpecified;
+ }
+
public static DataProperty read(DataInput in) throws IOException {
if (Env.getCurrentEnvJournalVersion() >= FeMetaVersion.VERSION_108) {
String json = Text.readString(in);
@@ -164,4 +173,5 @@ public class DataProperty implements Writable,
GsonPostProcessable {
// storagePolicy is a newly added field, it may be null when replaying
from old version.
this.storagePolicy = Strings.nullToEmpty(this.storagePolicy);
}
+
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
index 086c4bb7ec..27c1a99c68 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -578,7 +578,7 @@ public class OlapTable extends Table {
try {
Map<Tag, List<Long>> tag2beIds =
Env.getCurrentSystemInfo().selectBackendIdsForReplicaCreation(
- replicaAlloc, null);
+ replicaAlloc, null, false, false);
for (Map.Entry<Tag, List<Long>> entry3 :
tag2beIds.entrySet()) {
for (Long beId : entry3.getValue()) {
long newReplicaId = env.getNextId();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/util/DynamicPartitionUtil.java
b/fe/fe-core/src/main/java/org/apache/doris/common/util/DynamicPartitionUtil.java
index 7d0e5833fb..54c0594482 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/util/DynamicPartitionUtil.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/util/DynamicPartitionUtil.java
@@ -228,7 +228,7 @@ public class DynamicPartitionUtil {
ErrorReport.reportDdlException(ErrorCode.ERROR_DYNAMIC_PARTITION_REPLICATION_NUM_FORMAT,
val);
}
ReplicaAllocation replicaAlloc = new
ReplicaAllocation(Short.valueOf(val));
-
Env.getCurrentSystemInfo().selectBackendIdsForReplicaCreation(replicaAlloc,
null);
+
Env.getCurrentSystemInfo().selectBackendIdsForReplicaCreation(replicaAlloc,
null, false, true);
}
private static void checkReplicaAllocation(ReplicaAllocation replicaAlloc,
int hotPartitionNum,
@@ -237,13 +237,14 @@ public class DynamicPartitionUtil {
ErrorReport.reportDdlException(ErrorCode.ERROR_DYNAMIC_PARTITION_REPLICATION_NUM_ZERO);
}
-
Env.getCurrentSystemInfo().selectBackendIdsForReplicaCreation(replicaAlloc,
null);
+
Env.getCurrentSystemInfo().selectBackendIdsForReplicaCreation(replicaAlloc,
null, false, true);
if (hotPartitionNum <= 0) {
return;
}
try {
-
Env.getCurrentSystemInfo().selectBackendIdsForReplicaCreation(replicaAlloc,
TStorageMedium.SSD);
+
Env.getCurrentSystemInfo().selectBackendIdsForReplicaCreation(replicaAlloc,
TStorageMedium.SSD, false,
+ true);
} catch (DdlException e) {
throw new DdlException("Failed to find enough backend for ssd
storage medium. When setting "
+ DynamicPartitionProperty.HOT_PARTITION_NUM + " > 0, the
hot partitions will store "
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
index beb6d3b893..dd137077db 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
@@ -140,12 +140,6 @@ public class PropertyAnalyzer {
public static final String
PROPERTIES_ENABLE_DUPLICATE_WITHOUT_KEYS_BY_DEFAULT =
"enable_duplicate_without_keys_by_default";
-
- private static final Logger LOG =
LogManager.getLogger(PropertyAnalyzer.class);
- private static final String COMMA_SEPARATOR = ",";
- private static final double MAX_FPP = 0.05;
- private static final double MIN_FPP = 0.0001;
-
// For unique key data model, the feature Merge-on-Write will leverage a
primary
// key index and a delete-bitmap to mark duplicate keys as deleted in load
stage,
// which can avoid the merging cost in read stage, and accelerate the
aggregation
@@ -153,6 +147,10 @@ public class PropertyAnalyzer {
// For the detail design, see the
[DISP-018](https://cwiki.apache.org/confluence/
//
display/DORIS/DSIP-018%3A+Support+Merge-On-Write+implementation+for+UNIQUE+KEY+data+model)
public static final String ENABLE_UNIQUE_KEY_MERGE_ON_WRITE =
"enable_unique_key_merge_on_write";
+ private static final Logger LOG =
LogManager.getLogger(PropertyAnalyzer.class);
+ private static final String COMMA_SEPARATOR = ",";
+ private static final double MAX_FPP = 0.05;
+ private static final double MIN_FPP = 0.0001;
/**
* check and replace members of DataProperty by properties.
@@ -172,6 +170,7 @@ public class PropertyAnalyzer {
long cooldownTimestamp = oldDataProperty.getCooldownTimeMs();
String newStoragePolicy = oldDataProperty.getStoragePolicy();
boolean hasStoragePolicy = false;
+ boolean storageMediumSpecified = false;
for (Map.Entry<String, String> entry : properties.entrySet()) {
String key = entry.getKey();
@@ -179,8 +178,10 @@ public class PropertyAnalyzer {
if (key.equalsIgnoreCase(PROPERTIES_STORAGE_MEDIUM)) {
if (value.equalsIgnoreCase(TStorageMedium.SSD.name())) {
storageMedium = TStorageMedium.SSD;
+ storageMediumSpecified = true;
} else if (value.equalsIgnoreCase(TStorageMedium.HDD.name())) {
storageMedium = TStorageMedium.HDD;
+ storageMediumSpecified = true;
} else {
throw new AnalysisException("Invalid storage medium: " +
value);
}
@@ -247,7 +248,12 @@ public class PropertyAnalyzer {
boolean mutable = PropertyAnalyzer.analyzeBooleanProp(properties,
PROPERTIES_MUTABLE, true);
properties.remove(PROPERTIES_MUTABLE);
- return new DataProperty(storageMedium, cooldownTimestamp,
newStoragePolicy, mutable);
+ DataProperty dataProperty = new DataProperty(storageMedium,
cooldownTimestamp, newStoragePolicy, mutable);
+ // check the state of data property
+ if (storageMediumSpecified) {
+ dataProperty.setStorageMediumSpecified(true);
+ }
+ return dataProperty;
}
public static short analyzeShortKeyColumnCount(Map<String, String>
properties) throws AnalysisException {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
index 8e996a110b..9e1e3a8396 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
@@ -222,19 +222,14 @@ public class InternalCatalog implements
CatalogIf<Database> {
fullNameToDb.put(db.getFullName(), db);
}
- @Override
- public long getId() {
- return INTERNAL_CATALOG_ID;
- }
-
@Override
public String getType() {
return "internal";
}
@Override
- public String getComment() {
- return "Doris internal catalog";
+ public long getId() {
+ return INTERNAL_CATALOG_ID;
}
@Override
@@ -247,6 +242,10 @@ public class InternalCatalog implements
CatalogIf<Database> {
return Lists.newArrayList(fullNameToDb.keySet());
}
+ public List<Long> getDbIds() {
+ return Lists.newArrayList(idToDb.keySet());
+ }
+
@Nullable
@Override
public Database getDbNullable(String dbName) {
@@ -278,16 +277,6 @@ public class InternalCatalog implements
CatalogIf<Database> {
return idToDb.get(dbId);
}
- public TableName getTableNameByTableId(Long tableId) {
- for (Database db : fullNameToDb.values()) {
- Table table = db.getTableNullable(tableId);
- if (table != null) {
- return new TableName("", db.getFullName(), table.getName());
- }
- }
- return null;
- }
-
@Override
public Map<String, String> getProperties() {
return Maps.newHashMap();
@@ -303,6 +292,21 @@ public class InternalCatalog implements
CatalogIf<Database> {
LOG.warn("Ignore the modify catalog props in build-in catalog.");
}
+ @Override
+ public String getComment() {
+ return "Doris internal catalog";
+ }
+
+ public TableName getTableNameByTableId(Long tableId) {
+ for (Database db : fullNameToDb.values()) {
+ Table table = db.getTableNullable(tableId);
+ if (table != null) {
+ return new TableName("", db.getFullName(), table.getName());
+ }
+ }
+ return null;
+ }
+
// Use tryLock to avoid potential dead lock
private boolean tryLock(boolean mustLock) {
while (true) {
@@ -336,10 +340,6 @@ public class InternalCatalog implements
CatalogIf<Database> {
}
}
- public List<Long> getDbIds() {
- return Lists.newArrayList(idToDb.keySet());
- }
-
public List<Database> getDbs() {
return Lists.newArrayList(idToDb.values());
}
@@ -1225,7 +1225,7 @@ public class InternalCatalog implements
CatalogIf<Database> {
ColumnDef columnDef;
if (resultExpr.getSrcSlotRef() == null) {
columnDef = new ColumnDef(name, typeDef, false, null,
- true, false, new DefaultValue(false, null), "");
+ true, false, new DefaultValue(false, null), "");
} else {
Column column =
resultExpr.getSrcSlotRef().getDesc().getColumn();
boolean setDefault =
StringUtils.isNotBlank(column.getDefaultValue());
@@ -1517,7 +1517,7 @@ public class InternalCatalog implements
CatalogIf<Database> {
olapTable.getEnableUniqueKeyMergeOnWrite(), storagePolicy,
idGeneratorBuffer,
olapTable.disableAutoCompaction(),
olapTable.enableSingleReplicaCompaction(),
olapTable.skipWriteIndexOnLoad(),
olapTable.storeRowColumn(), olapTable.isDynamicSchema(),
- binlogConfig);
+ binlogConfig, dataProperty.isStorageMediumSpecified());
// check again
olapTable = db.getOlapTableOrDdlException(tableName);
@@ -1744,7 +1744,8 @@ public class InternalCatalog implements
CatalogIf<Database> {
DataSortInfo dataSortInfo, boolean enableUniqueKeyMergeOnWrite,
String storagePolicy,
IdGeneratorBuffer idGeneratorBuffer, boolean disableAutoCompaction,
boolean enableSingleReplicaCompaction, boolean
skipWriteIndexOnLoad,
- boolean storeRowColumn, boolean isDynamicSchema, BinlogConfig
binlogConfig) throws DdlException {
+ boolean storeRowColumn, boolean isDynamicSchema, BinlogConfig
binlogConfig,
+ boolean isStorageMediumSpecified) throws DdlException {
// create base index first.
Preconditions.checkArgument(baseIndexId != -1);
MaterializedIndex baseIndex = new MaterializedIndex(baseIndexId,
IndexState.NORMAL);
@@ -1783,7 +1784,7 @@ public class InternalCatalog implements
CatalogIf<Database> {
int schemaHash = indexMeta.getSchemaHash();
TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId,
indexId, schemaHash, storageMedium);
createTablets(clusterName, index, ReplicaState.NORMAL,
distributionInfo, version, replicaAlloc, tabletMeta,
- tabletIdSet, idGeneratorBuffer);
+ tabletIdSet, idGeneratorBuffer, isStorageMediumSpecified);
boolean ok = false;
String errMsg = null;
@@ -2190,7 +2191,7 @@ public class InternalCatalog implements
CatalogIf<Database> {
List<Column> rollupColumns =
Env.getCurrentEnv().getMaterializedViewHandler()
.checkAndPrepareMaterializedView(addRollupClause,
olapTable, baseRollupIndex, false);
short rollupShortKeyColumnCount =
Env.calcShortKeyColumnCount(rollupColumns, alterClause.getProperties(),
-
true/*isKeysRequired*/);
+ true/*isKeysRequired*/);
int rollupSchemaHash = Util.generateSchemaHash();
long rollupIndexId = idGeneratorBuffer.getNextId();
olapTable.setIndexMeta(rollupIndexId,
addRollupClause.getRollupName(), rollupColumns, schemaVersion,
@@ -2269,7 +2270,6 @@ public class InternalCatalog implements
CatalogIf<Database> {
"Database " + db.getFullName() + " create
unpartitioned table " + tableName + " increasing "
+ totalReplicaNum + " of replica exceeds
quota[" + db.getReplicaQuota() + "]");
}
- // create partition
Partition partition =
createPartitionWithIndices(db.getClusterName(), db.getId(), olapTable.getId(),
olapTable.getBaseIndexId(), partitionId,
partitionName, olapTable.getIndexIdToMeta(),
partitionDistributionInfo,
partitionInfo.getDataProperty(partitionId).getStorageMedium(),
@@ -2278,7 +2278,8 @@ public class InternalCatalog implements
CatalogIf<Database> {
olapTable.getDataSortInfo(),
olapTable.getEnableUniqueKeyMergeOnWrite(), storagePolicy,
idGeneratorBuffer, olapTable.disableAutoCompaction(),
olapTable.enableSingleReplicaCompaction(),
skipWriteIndexOnLoad,
- storeRowColumn, isDynamicSchema, binlogConfigForTask);
+ storeRowColumn, isDynamicSchema, binlogConfigForTask,
+
partitionInfo.getDataProperty(partitionId).isStorageMediumSpecified());
olapTable.addPartition(partition);
} else if (partitionInfo.getType() == PartitionType.RANGE
|| partitionInfo.getType() == PartitionType.LIST) {
@@ -2329,22 +2330,23 @@ public class InternalCatalog implements
CatalogIf<Database> {
&& !Strings.isNullOrEmpty(partionStoragePolicy)) {
throw new AnalysisException(
"Can not create UNIQUE KEY table that enables
Merge-On-write"
- + " with storage policy(" +
partionStoragePolicy + ")");
+ + " with storage policy(" +
partionStoragePolicy + ")");
}
if (!partionStoragePolicy.equals("")) {
storagePolicy = partionStoragePolicy;
}
Env.getCurrentEnv().getPolicyMgr().checkStoragePolicyExist(storagePolicy);
- Partition partition =
createPartitionWithIndices(db.getClusterName(), db.getId(), olapTable.getId(),
- olapTable.getBaseIndexId(), entry.getValue(),
entry.getKey(), olapTable.getIndexIdToMeta(),
- partitionDistributionInfo,
dataProperty.getStorageMedium(),
-
partitionInfo.getReplicaAllocation(entry.getValue()), versionInfo, bfColumns,
bfFpp,
- tabletIdSet, olapTable.getCopiedIndexes(),
isInMemory, storageFormat,
- partitionInfo.getTabletType(entry.getValue()),
compressionType,
+
+ Partition partition =
createPartitionWithIndices(db.getClusterName(), db.getId(),
+ olapTable.getId(), olapTable.getBaseIndexId(),
entry.getValue(), entry.getKey(),
+ olapTable.getIndexIdToMeta(),
partitionDistributionInfo,
+ dataProperty.getStorageMedium(),
partitionInfo.getReplicaAllocation(entry.getValue()),
+ versionInfo, bfColumns, bfFpp, tabletIdSet,
olapTable.getCopiedIndexes(), isInMemory,
+ storageFormat,
partitionInfo.getTabletType(entry.getValue()), compressionType,
olapTable.getDataSortInfo(),
olapTable.getEnableUniqueKeyMergeOnWrite(), storagePolicy,
idGeneratorBuffer,
olapTable.disableAutoCompaction(),
- olapTable.enableSingleReplicaCompaction(),
skipWriteIndexOnLoad,
- storeRowColumn, isDynamicSchema,
binlogConfigForTask);
+ olapTable.enableSingleReplicaCompaction(),
skipWriteIndexOnLoad, storeRowColumn,
+ isDynamicSchema, binlogConfigForTask,
dataProperty.isStorageMediumSpecified());
olapTable.addPartition(partition);
}
} else {
@@ -2380,7 +2382,7 @@ public class InternalCatalog implements
CatalogIf<Database> {
// register or remove table from DynamicPartition after table
created
DynamicPartitionUtil.registerOrRemoveDynamicPartitionTable(db.getId(),
olapTable, false);
Env.getCurrentEnv().getDynamicPartitionScheduler()
- .executeDynamicPartitionFirstTime(db.getId(),
olapTable.getId());
+ .executeDynamicPartitionFirstTime(db.getId(),
olapTable.getId());
Env.getCurrentEnv().getDynamicPartitionScheduler()
.createOrUpdateRuntimeInfo(tableId,
DynamicPartitionScheduler.LAST_UPDATE_TIME,
TimeUtils.getCurrentFormatTime());
@@ -2528,7 +2530,8 @@ public class InternalCatalog implements
CatalogIf<Database> {
@VisibleForTesting
public void createTablets(String clusterName, MaterializedIndex index,
ReplicaState replicaState,
DistributionInfo distributionInfo, long version, ReplicaAllocation
replicaAlloc, TabletMeta tabletMeta,
- Set<Long> tabletIdSet, IdGeneratorBuffer idGeneratorBuffer) throws
DdlException {
+ Set<Long> tabletIdSet, IdGeneratorBuffer idGeneratorBuffer,
boolean isStorageMediumSpecified)
+ throws DdlException {
ColocateTableIndex colocateIndex = Env.getCurrentColocateIndex();
Map<Tag, List<List<Long>>> backendsPerBucketSeq = null;
GroupId groupId = null;
@@ -2588,10 +2591,12 @@ public class InternalCatalog implements
CatalogIf<Database> {
} else {
if (!Config.disable_storage_medium_check) {
chosenBackendIds = Env.getCurrentSystemInfo()
-
.selectBackendIdsForReplicaCreation(replicaAlloc,
tabletMeta.getStorageMedium());
+
.selectBackendIdsForReplicaCreation(replicaAlloc, tabletMeta.getStorageMedium(),
+ isStorageMediumSpecified, false);
} else {
chosenBackendIds = Env.getCurrentSystemInfo()
-
.selectBackendIdsForReplicaCreation(replicaAlloc, null);
+
.selectBackendIdsForReplicaCreation(replicaAlloc, null,
+ isStorageMediumSpecified, false);
}
}
@@ -2764,7 +2769,8 @@ public class InternalCatalog implements
CatalogIf<Database> {
olapTable.getPartitionInfo().getDataProperty(oldPartitionId).getStoragePolicy(),
idGeneratorBuffer, olapTable.disableAutoCompaction(),
olapTable.enableSingleReplicaCompaction(),
olapTable.skipWriteIndexOnLoad(),
- olapTable.storeRowColumn(),
olapTable.isDynamicSchema(), binlogConfig);
+ olapTable.storeRowColumn(),
olapTable.isDynamicSchema(), binlogConfig,
+
copiedTbl.getPartitionInfo().getDataProperty(oldPartitionId).isStorageMediumSpecified());
newPartitions.add(newPartition);
}
} catch (DdlException e) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
index 12483ff2f7..ace77c1ca8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
@@ -528,11 +528,14 @@ public class SystemInfoService {
*
* @param replicaAlloc
* @param storageMedium
+ * @param isStorageMediumSpecified
+ * @param isOnlyForCheck set true if only used for check available backend
* @return return the selected backend ids group by tag.
* @throws DdlException
*/
public Map<Tag, List<Long>> selectBackendIdsForReplicaCreation(
- ReplicaAllocation replicaAlloc, TStorageMedium storageMedium)
+ ReplicaAllocation replicaAlloc, TStorageMedium storageMedium,
boolean isStorageMediumSpecified,
+ boolean isOnlyForCheck)
throws DdlException {
Map<Long, Backend> copiedBackends = Maps.newHashMap(idToBackendRef);
Map<Tag, List<Long>> chosenBackendIds = Maps.newHashMap();
@@ -557,6 +560,14 @@ public class SystemInfoService {
BeSelectionPolicy policy = builder.build();
List<Long> beIds = selectBackendIdsByPolicy(policy,
entry.getValue());
+ // first time empty, retry with different storage medium
+ // if only for check, no need to retry different storage
medium to get backend
+ if (beIds.isEmpty() && storageMedium != null &&
!isStorageMediumSpecified && !isOnlyForCheck) {
+ storageMedium = (storageMedium == TStorageMedium.HDD) ?
TStorageMedium.SSD : TStorageMedium.HDD;
+ policy = builder.setStorageMedium(storageMedium).build();
+ beIds = selectBackendIdsByPolicy(policy, entry.getValue());
+ }
+ // after retry different storage medium, it's still empty
if (beIds.isEmpty()) {
LOG.error("failed backend(s) for policy:" + policy);
String errorReplication = "replication tag: " +
entry.getKey()
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java
b/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java
index 5317c122e7..dcc76fdfcd 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java
@@ -153,7 +153,8 @@ public class RestoreJobTest {
new Expectations() {
{
-
systemInfoService.selectBackendIdsForReplicaCreation((ReplicaAllocation) any,
(TStorageMedium) any);
+
systemInfoService.selectBackendIdsForReplicaCreation((ReplicaAllocation) any,
(TStorageMedium) any,
+ false, true);
minTimes = 0;
result = new Delegate() {
public synchronized List<Long>
selectBackendIdsForReplicaCreation(
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableElasticOnStorageMediumTest.java
b/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableElasticOnStorageMediumTest.java
new file mode 100644
index 0000000000..8b09b4b65b
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableElasticOnStorageMediumTest.java
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.catalog;
+
+import org.apache.doris.common.DdlException;
+import org.apache.doris.system.Backend;
+import org.apache.doris.system.SystemInfoService;
+import org.apache.doris.thrift.TStorageMedium;
+import org.apache.doris.utframe.TestWithFeService;
+
+import org.junit.jupiter.api.Assertions;
+
+import java.util.List;
+
+public class CreateTableElasticOnStorageMediumTest extends TestWithFeService {
+
+ @Override
+ protected void runAfterAll() throws Exception {
+ Env.getCurrentEnv().clear();
+ }
+
+ public void setStorageMediumToSSDTest() throws Exception {
+ SystemInfoService clusterInfo = Env.getCurrentEnv().getClusterInfo();
+ List<Backend> allBackends = clusterInfo.getAllBackends();
+ // set all backends' storage medium to SSD
+ for (Backend backend : allBackends) {
+ if (backend.hasPathHash()) {
+ backend.getDisks().values().stream()
+ .peek(diskInfo ->
diskInfo.setStorageMedium(TStorageMedium.SSD));
+ }
+ }
+ createDatabase("db1");
+
+ String sql1 = "CREATE TABLE IF NOT EXISTS db1.t1 (pk INT, v1 INT sum)
AGGREGATE KEY (pk) "
+ + "DISTRIBUTED BY HASH(pk) BUCKETS 1 PROPERTIES
('replication_num' = '1');";
+ Assertions.assertDoesNotThrow(() -> createTables(sql1));
+ String sql2 = "CREATE TABLE IF NOT EXISTS db1.t2 (pk INT, v1 INT sum)
AGGREGATE KEY (pk) "
+ + "DISTRIBUTED BY HASH(pk) BUCKETS 1 PROPERTIES
('replication_num' = '1', 'storage_medium' = 'ssd');";
+ Assertions.assertDoesNotThrow(() -> createTables(sql2));
+ String sql3 = "CREATE TABLE IF NOT EXISTS db1.t3 (pk INT, v1 INT sum)
AGGREGATE KEY (pk) "
+ + "DISTRIBUTED BY HASH(pk) BUCKETS 1 PROPERTIES
('replication_num' = '1', 'storage_medium' = 'hdd');";
+ Assertions.assertThrows(DdlException.class, () -> createTables(sql3));
+ }
+
+ public void setStorageMediumToHDDTest() throws Exception {
+ SystemInfoService clusterInfo = Env.getCurrentEnv().getClusterInfo();
+ List<Backend> allBackends = clusterInfo.getAllBackends();
+ // set all backends' storage medium to SSD
+ for (Backend backend : allBackends) {
+ if (backend.hasPathHash()) {
+ backend.getDisks().values().stream()
+ .peek(diskInfo ->
diskInfo.setStorageMedium(TStorageMedium.HDD));
+ }
+ }
+ createDatabase("db1");
+
+ String sql1 = "CREATE TABLE IF NOT EXISTS db1.t4 (pk INT, v1 INT sum)
AGGREGATE KEY (pk) "
+ + "DISTRIBUTED BY HASH(pk) BUCKETS 1 PROPERTIES
('replication_num' = '1');";
+ Assertions.assertDoesNotThrow(() -> createTables(sql1));
+ String sql2 = "CREATE TABLE IF NOT EXISTS db1.t5 (pk INT, v1 INT sum)
AGGREGATE KEY (pk) "
+ + "DISTRIBUTED BY HASH(pk) BUCKETS 1 PROPERTIES
('replication_num' = '1', 'storage_medium' = 'hdd');";
+ Assertions.assertDoesNotThrow(() -> createTables(sql2));
+ String sql3 = "CREATE TABLE IF NOT EXISTS db1.t6 (pk INT, v1 INT sum)
AGGREGATE KEY (pk) "
+ + "DISTRIBUTED BY HASH(pk) BUCKETS 1 PROPERTIES
('replication_num' = '1', 'storage_medium' = 'ssd');";
+ Assertions.assertThrows(DdlException.class, () -> createTables(sql3));
+ }
+
+}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/catalog/ModifyBackendTest.java
b/fe/fe-core/src/test/java/org/apache/doris/catalog/ModifyBackendTest.java
index 891c48b75f..6020369d50 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/catalog/ModifyBackendTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/ModifyBackendTest.java
@@ -82,7 +82,7 @@ public class ModifyBackendTest {
CreateTableStmt createStmt = (CreateTableStmt)
UtFrameUtils.parseAndAnalyzeStmt(createStr, connectContext);
ExceptionChecker.expectThrowsWithMsg(DdlException.class, "Failed to
find enough backend, please check the replication num,replication tag and
storage medium.\n"
+ "Create failed replications:\n"
- + "replication tag: {\"location\" : \"default\"},
replication num: 1, storage medium: HDD",
+ + "replication tag: {\"location\" : \"default\"},
replication num: 1, storage medium: SSD",
() -> DdlExecutor.execute(Env.getCurrentEnv(), createStmt));
createStr = "create table test.tbl1(\n" + "k1 int\n" + ") distributed
by hash(k1)\n" + "buckets 3 properties(\n"
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/RoundRobinCreateTabletTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/RoundRobinCreateTabletTest.java
index 029ce462dd..fb65a13e74 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/datasource/RoundRobinCreateTabletTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/RoundRobinCreateTabletTest.java
@@ -113,7 +113,7 @@ public class RoundRobinCreateTabletTest {
try {
Env.getCurrentEnv().getInternalCatalog().createTablets(clusterName, index,
ReplicaState.NORMAL,
distributionInfo, 0, replicaAlloc, tabletMeta,
- tabletIdSet, idGeneratorBuffer);
+ tabletIdSet, idGeneratorBuffer, false);
} catch (Exception e) {
System.out.println("failed to create tablets " + e.getMessage());
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java
b/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java
index f043e90006..bf57f21f02 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java
@@ -150,7 +150,7 @@ public class CanalSyncDataTest {
result = execPlanFragmentParams;
systemInfoService.selectBackendIdsForReplicaCreation((ReplicaAllocation) any,
- (TStorageMedium) any);
+ (TStorageMedium) any, false, true);
minTimes = 0;
result = backendIds;
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java
b/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java
index 17b9b6591f..172fbd5594 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java
@@ -373,7 +373,7 @@ public class SystemInfoServiceTest {
Map<Long, Integer> beCounterMap = Maps.newHashMap();
for (int i = 0; i < 10000; ++i) {
Map<Tag, List<Long>> res =
infoService.selectBackendIdsForReplicaCreation(replicaAlloc,
- TStorageMedium.HDD);
+ TStorageMedium.HDD, false, false);
Assert.assertEquals(3, res.get(Tag.DEFAULT_BACKEND_TAG).size());
for (Long beId : res.get(Tag.DEFAULT_BACKEND_TAG)) {
beCounterMap.put(beId, beCounterMap.getOrDefault(beId, 0) + 1);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]