This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new ea708d9bafb [fix](cluster key) fix mow cluster key with schema change
(#40372)
ea708d9bafb is described below
commit ea708d9bafb1206c0c8e3387c6794e348e30afa3
Author: meiyi <[email protected]>
AuthorDate: Sat Sep 14 20:02:55 2024 +0800
[fix](cluster key) fix mow cluster key with schema change (#40372)
---
be/src/olap/memtable.cpp | 9 +-
be/src/olap/merger.cpp | 56 ++--
be/src/olap/merger.h | 3 +-
be/src/olap/rowset/segcompaction.cpp | 4 +-
be/src/olap/rowset/segment_v2/segment_writer.cpp | 24 +-
.../rowset/segment_v2/vertical_segment_writer.cpp | 8 +-
.../java/org/apache/doris/alter/RollupJobV2.java | 2 +-
.../apache/doris/alter/SchemaChangeHandler.java | 5 +-
.../org/apache/doris/alter/SchemaChangeJobV2.java | 24 ++
.../java/org/apache/doris/analysis/ColumnDef.java | 4 +
.../org/apache/doris/analysis/CreateTableStmt.java | 5 +-
.../java/org/apache/doris/analysis/KeysDesc.java | 67 ++--
.../java/org/apache/doris/backup/RestoreJob.java | 11 +
.../java/org/apache/doris/catalog/OlapTable.java | 11 +
.../cloud/datasource/CloudInternalCatalog.java | 3 +-
.../apache/doris/datasource/InternalCatalog.java | 49 ++-
.../org/apache/doris/master/ReportHandler.java | 12 +-
.../trees/plans/commands/info/CreateTableInfo.java | 94 +++---
.../unique_with_mow_c_p0/test_schema_change_ck.out | 365 +++++++++++++++++++++
.../unique_with_mow_c_p0/test_create_table.groovy | 2 +-
.../unique_with_mow_c_p0/test_schema_change.groovy | 6 +-
.../test_schema_change_ck.groovy | 262 +++++++++++++++
22 files changed, 849 insertions(+), 177 deletions(-)
diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index 4f66a361650..671e07d7556 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -323,9 +323,14 @@ Status MemTable::_sort_by_cluster_keys() {
}
Tie tie = Tie(0, mutable_block.rows());
- for (auto i : _tablet_schema->cluster_key_idxes()) {
+ for (auto cid : _tablet_schema->cluster_key_idxes()) {
+ auto index = _tablet_schema->field_index(cid);
+ if (index == -1) {
+ return Status::InternalError("could not find cluster key column
with unique_id=" +
+ std::to_string(cid) + " in tablet
schema");
+ }
auto cmp = [&](const RowInBlock* lhs, const RowInBlock* rhs) -> int {
- return mutable_block.compare_one_column(lhs->_row_pos,
rhs->_row_pos, i, -1);
+ return mutable_block.compare_one_column(lhs->_row_pos,
rhs->_row_pos, index, -1);
};
_sort_one_column(row_in_blocks, tie, cmp);
}
diff --git a/be/src/olap/merger.cpp b/be/src/olap/merger.cpp
index cba828785d9..ab034123ac8 100644
--- a/be/src/olap/merger.cpp
+++ b/be/src/olap/merger.cpp
@@ -57,30 +57,6 @@
#include "vec/olap/vertical_merge_iterator.h"
namespace doris {
-namespace {
-
-// for mow with cluster key table, the key group also contains cluster key
columns.
-// the `key_group_cluster_key_idxes` marks the positions of cluster key
columns in key group.
-void _generate_key_group_cluster_key_idxes(const TabletSchema& tablet_schema,
- std::vector<std::vector<uint32_t>>&
column_groups,
- std::vector<uint32_t>&
key_group_cluster_key_idxes) {
- if (column_groups.empty() || tablet_schema.cluster_key_idxes().empty()) {
- return;
- }
-
- auto& key_column_group = column_groups[0];
- for (const auto& index_in_tablet_schema :
tablet_schema.cluster_key_idxes()) {
- for (auto j = 0; j < key_column_group.size(); ++j) {
- auto cid = key_column_group[j];
- if (cid == index_in_tablet_schema) {
- key_group_cluster_key_idxes.emplace_back(j);
- break;
- }
- }
- }
-}
-
-} // namespace
Status Merger::vmerge_rowsets(BaseTabletSPtr tablet, ReaderType reader_type,
const TabletSchema& cur_tablet_schema,
@@ -183,7 +159,8 @@ Status Merger::vmerge_rowsets(BaseTabletSPtr tablet,
ReaderType reader_type,
// split columns into several groups, make sure all keys in one group
// unique_key should consider sequence&delete column
void Merger::vertical_split_columns(const TabletSchema& tablet_schema,
- std::vector<std::vector<uint32_t>>*
column_groups) {
+ std::vector<std::vector<uint32_t>>*
column_groups,
+ std::vector<uint32_t>*
key_group_cluster_key_idxes) {
uint32_t num_key_cols = tablet_schema.num_key_columns();
uint32_t total_cols = tablet_schema.num_columns();
std::vector<uint32_t> key_columns;
@@ -206,8 +183,24 @@ void Merger::vertical_split_columns(const TabletSchema&
tablet_schema,
}
if (!tablet_schema.cluster_key_idxes().empty()) {
for (const auto& cid : tablet_schema.cluster_key_idxes()) {
- if (cid >= num_key_cols) {
- key_columns.emplace_back(cid);
+ auto idx = tablet_schema.field_index(cid);
+ DCHECK(idx >= 0) << "could not find cluster key column with
unique_id=" << cid
+ << " in tablet schema, table_id=" <<
tablet_schema.table_id();
+ if (idx >= num_key_cols) {
+ key_columns.emplace_back(idx);
+ }
+ }
+ // tablet schema unique ids: [1, 2, 5, 3, 6, 4], [1 2] is key
columns
+ // cluster key unique ids: [3, 1, 4]
+ // the key_columns should be [0, 1, 3, 5]
+ // the key_group_cluster_key_idxes should be [2, 1, 3]
+ for (const auto& cid : tablet_schema.cluster_key_idxes()) {
+ auto idx = tablet_schema.field_index(cid);
+ for (auto i = 0; i < key_columns.size(); ++i) {
+ if (idx == key_columns[i]) {
+ key_group_cluster_key_idxes->emplace_back(i);
+ break;
+ }
}
}
}
@@ -218,14 +211,12 @@ void Merger::vertical_split_columns(const TabletSchema&
tablet_schema,
if (!key_columns.empty()) {
column_groups->emplace_back(std::move(key_columns));
}
- auto&& cluster_key_idxes = tablet_schema.cluster_key_idxes();
std::vector<uint32_t> value_columns;
for (uint32_t i = num_key_cols; i < total_cols; ++i) {
if (i == sequence_col_idx || i == delete_sign_idx ||
- cluster_key_idxes.end() !=
- std::find(cluster_key_idxes.begin(),
cluster_key_idxes.end(), i)) {
+ key_columns.end() != std::find(key_columns.begin(),
key_columns.end(), i)) {
continue;
}
@@ -460,11 +451,8 @@ Status Merger::vertical_merge_rowsets(BaseTabletSPtr
tablet, ReaderType reader_t
int64_t merge_way_num, Statistics*
stats_output) {
LOG(INFO) << "Start to do vertical compaction, tablet_id: " <<
tablet->tablet_id();
std::vector<std::vector<uint32_t>> column_groups;
- vertical_split_columns(tablet_schema, &column_groups);
-
std::vector<uint32_t> key_group_cluster_key_idxes;
- _generate_key_group_cluster_key_idxes(tablet_schema, column_groups,
- key_group_cluster_key_idxes);
+ vertical_split_columns(tablet_schema, &column_groups,
&key_group_cluster_key_idxes);
vectorized::RowSourcesBuffer row_sources_buf(
tablet->tablet_id(), dst_rowset_writer->context().tablet_path,
reader_type);
diff --git a/be/src/olap/merger.h b/be/src/olap/merger.h
index cb05162b3bc..7d430cde7f3 100644
--- a/be/src/olap/merger.h
+++ b/be/src/olap/merger.h
@@ -66,7 +66,8 @@ public:
// for vertical compaction
static void vertical_split_columns(const TabletSchema& tablet_schema,
- std::vector<std::vector<uint32_t>>*
column_groups);
+ std::vector<std::vector<uint32_t>>*
column_groups,
+ std::vector<uint32_t>*
key_group_cluster_key_idxes);
static Status vertical_compact_one_group(
BaseTabletSPtr tablet, ReaderType reader_type, const TabletSchema&
tablet_schema,
bool is_key, const std::vector<uint32_t>& column_group,
diff --git a/be/src/olap/rowset/segcompaction.cpp
b/be/src/olap/rowset/segcompaction.cpp
index 374056f7b9d..fc8baf952c1 100644
--- a/be/src/olap/rowset/segcompaction.cpp
+++ b/be/src/olap/rowset/segcompaction.cpp
@@ -248,7 +248,9 @@ Status
SegcompactionWorker::_do_compact_segments(SegCompactionCandidatesSharedPt
}
std::vector<std::vector<uint32_t>> column_groups;
- Merger::vertical_split_columns(*ctx.tablet_schema, &column_groups);
+ std::vector<uint32_t> key_group_cluster_key_idxes;
+ Merger::vertical_split_columns(*ctx.tablet_schema, &column_groups,
+ &key_group_cluster_key_idxes);
vectorized::RowSourcesBuffer row_sources_buf(tablet->tablet_id(),
tablet->tablet_path(),
ReaderType::READER_SEGMENT_COMPACTION);
diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp
b/be/src/olap/rowset/segment_v2/segment_writer.cpp
index 0e9b55d99b8..225677f5d1f 100644
--- a/be/src/olap/rowset/segment_v2/segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp
@@ -126,7 +126,7 @@ SegmentWriter::SegmentWriter(io::FileWriter* file_writer,
uint32_t segment_id,
_key_index_size.clear();
_num_sort_key_columns = _tablet_schema->cluster_key_idxes().size();
for (auto cid : _tablet_schema->cluster_key_idxes()) {
- const auto& column = _tablet_schema->column(cid);
+ const auto& column = _tablet_schema->column_by_uid(cid);
_key_coders.push_back(get_key_coder(column.type()));
_key_index_size.push_back(column.index_length());
}
@@ -755,17 +755,31 @@ Status SegmentWriter::append_block(const
vectorized::Block* block, size_t row_po
// 2. generate short key index (use cluster key)
key_columns.clear();
for (const auto& cid : _tablet_schema->cluster_key_idxes()) {
- for (size_t id = 0; id < _column_writers.size(); ++id) {
- // olap data convertor always start from id = 0
- if (cid == _column_ids[id]) {
- auto converted_result =
_olap_data_convertor->convert_column_data(id);
+ // find cluster key index in tablet schema
+ auto cluster_key_index = _tablet_schema->field_index(cid);
+ if (cluster_key_index == -1) {
+ return Status::InternalError(
+ "could not find cluster key column with
unique_id=" +
+ std::to_string(cid) + " in tablet schema");
+ }
+ bool found = false;
+ for (auto i = 0; i < _column_ids.size(); ++i) {
+ if (_column_ids[i] == cluster_key_index) {
+ auto converted_result =
_olap_data_convertor->convert_column_data(i);
if (!converted_result.first.ok()) {
return converted_result.first;
}
key_columns.push_back(converted_result.second);
+ found = true;
break;
}
}
+ if (!found) {
+ return Status::InternalError(
+ "could not found cluster key column with
unique_id=" +
+ std::to_string(cid) +
+ ", tablet schema index=" +
std::to_string(cluster_key_index));
+ }
}
RETURN_IF_ERROR(_generate_short_key_index(key_columns, num_rows,
short_key_pos));
} else if (_is_mow()) {
diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
index 5663c3779df..4863f2c0401 100644
--- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
@@ -130,7 +130,7 @@
VerticalSegmentWriter::VerticalSegmentWriter(io::FileWriter* file_writer, uint32
_key_index_size.clear();
_num_sort_key_columns = _tablet_schema->cluster_key_idxes().size();
for (auto cid : _tablet_schema->cluster_key_idxes()) {
- const auto& column = _tablet_schema->column(cid);
+ const auto& column = _tablet_schema->column_by_uid(cid);
_key_coders.push_back(get_key_coder(column.type()));
_key_index_size.push_back(column.index_length());
}
@@ -714,6 +714,7 @@ Status VerticalSegmentWriter::write_batch() {
std::vector<vectorized::IOlapColumnDataAccessor*> key_columns;
vectorized::IOlapColumnDataAccessor* seq_column = nullptr;
+ // the key is cluster key column unique id
std::map<uint32_t, vectorized::IOlapColumnDataAccessor*> cid_to_column;
for (uint32_t cid = 0; cid < _tablet_schema->num_columns(); ++cid) {
RETURN_IF_ERROR(_create_column_writer(cid,
_tablet_schema->column(cid), _tablet_schema));
@@ -732,11 +733,12 @@ Status VerticalSegmentWriter::write_batch() {
if (_tablet_schema->has_sequence_col() && cid ==
_tablet_schema->sequence_col_idx()) {
seq_column = column;
}
+ auto column_unique_id = _tablet_schema->column(cid).unique_id();
if (_is_mow_with_cluster_key() &&
std::find(_tablet_schema->cluster_key_idxes().begin(),
_tablet_schema->cluster_key_idxes().end(),
- cid) != _tablet_schema->cluster_key_idxes().end()) {
- cid_to_column[cid] = column;
+ column_unique_id) !=
_tablet_schema->cluster_key_idxes().end()) {
+ cid_to_column[column_unique_id] = column;
}
RETURN_IF_ERROR(_column_writers[cid]->append(column->get_nullmap(),
column->get_data(),
data.num_rows));
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
index 3a29c0c542e..62eff357875 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
@@ -233,7 +233,6 @@ public class RollupJobV2 extends AlterJobV2 implements
GsonPostProcessable {
TStorageMedium storageMedium =
tbl.getPartitionInfo().getDataProperty(partitionId).getStorageMedium();
TTabletType tabletType =
tbl.getPartitionInfo().getTabletType(partitionId);
MaterializedIndex rollupIndex = entry.getValue();
-
Map<Long, Long> tabletIdMap =
this.partitionIdToBaseRollupTabletIdMap.get(partitionId);
for (Tablet rollupTablet : rollupIndex.getTablets()) {
long rollupTabletId = rollupTablet.getId();
@@ -276,6 +275,7 @@ public class RollupJobV2 extends AlterJobV2 implements
GsonPostProcessable {
if (this.storageFormat != null) {
createReplicaTask.setStorageFormat(this.storageFormat);
}
+ // rollup replica does not need to set mow cluster keys
batchTask.addTask(createReplicaTask);
} // end for rollupReplicas
} // end for rollupTablets
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
index c47344f14c5..43857b2e898 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
@@ -310,7 +310,7 @@ public class SchemaChangeHandler extends AlterHandler {
boolean lightSchemaChange = olapTable.getEnableLightSchemaChange();
/*
* UNIQUE:
- * Can not drop any key column.
+ * Can not drop any key column, cluster key column
* AGGREGATION:
* Can not drp any key column is has value with REPLACE method
*/
@@ -844,9 +844,6 @@ public class SchemaChangeHandler extends AlterHandler {
if (!column.isVisible()) {
newSchema.add(column);
}
- if (column.isClusterKey()) {
- throw new DdlException("Can not modify column order in
Unique data model table");
- }
}
}
if (newSchema.size() != targetIndexSchema.size()) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
index f44d9416e72..c514bf6306e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
@@ -71,6 +71,7 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Table;
import com.google.common.collect.Table.Cell;
import com.google.gson.annotations.SerializedName;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -216,6 +217,20 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
partitionOriginIndexIdMap.clear();
}
+ private boolean isShadowIndexOfBase(long shadowIdxId, OlapTable tbl) {
+ if
(indexIdToName.get(shadowIdxId).startsWith(SchemaChangeHandler.SHADOW_NAME_PREFIX))
{
+ String shadowIndexName = indexIdToName.get(shadowIdxId);
+ String indexName = shadowIndexName
+
.substring(SchemaChangeHandler.SHADOW_NAME_PREFIX.length());
+ long indexId = tbl.getIndexIdByName(indexName);
+ LOG.info("shadow index id: {}, shadow index name: {}, pointer to
index id: {}, index name: {}, "
+ + "base index id: {}, table_id: {}", shadowIdxId,
shadowIndexName, indexId, indexName,
+ tbl.getBaseIndexId(), tbl.getId());
+ return indexId == tbl.getBaseIndexId();
+ }
+ return false;
+ }
+
protected void createShadowIndexReplica() throws AlterCancelException {
Database db = Env.getCurrentInternalCatalog()
.getDbOrException(dbId, s -> new
AlterCancelException("Database " + s + " does not exist"));
@@ -261,6 +276,10 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
short shadowShortKeyColumnCount =
indexShortKeyMap.get(shadowIdxId);
List<Column> shadowSchema =
indexSchemaMap.get(shadowIdxId);
+ List<Integer> clusterKeyIndexes = null;
+ if (shadowIdxId == tbl.getBaseIndexId() ||
isShadowIndexOfBase(shadowIdxId, tbl)) {
+ clusterKeyIndexes =
OlapTable.getClusterKeyIndexes(shadowSchema);
+ }
int shadowSchemaHash =
indexSchemaVersionAndHashMap.get(shadowIdxId).schemaHash;
long originIndexId = indexIdMap.get(shadowIdxId);
int originSchemaHash =
tbl.getSchemaHashByIndexId(originIndexId);
@@ -309,6 +328,11 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
}
createReplicaTask.setInvertedIndexFileStorageFormat(tbl
.getInvertedIndexFileStorageFormat());
+ if (!CollectionUtils.isEmpty(clusterKeyIndexes)) {
+
createReplicaTask.setClusterKeyIndexes(clusterKeyIndexes);
+ LOG.info("table: {}, partition: {}, index: {},
tablet: {}, cluster key indexes: {}",
+ tableId, partitionId, shadowIdxId,
shadowTabletId, clusterKeyIndexes);
+ }
batchTask.addTask(createReplicaTask);
} // end for rollupReplicas
} // end for rollupTablets
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ColumnDef.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ColumnDef.java
index 33474f8263c..625a3b3b131 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ColumnDef.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ColumnDef.java
@@ -354,6 +354,10 @@ public class ColumnDef {
return visible;
}
+ public int getClusterKeyId() {
+ return this.clusterKeyId;
+ }
+
public void setClusterKeyId(int clusterKeyId) {
this.clusterKeyId = clusterKeyId;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java
index 865489a113e..d3f37b632ca 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java
@@ -421,9 +421,6 @@ public class CreateTableStmt extends DdlStmt implements
NotFallbackInParser {
keysDesc.analyze(columnDefs);
if
(!CollectionUtils.isEmpty(keysDesc.getClusterKeysColumnNames())) {
- if (Config.isCloudMode()) {
- throw new AnalysisException("Cluster key is not supported
in cloud mode");
- }
if (!enableUniqueKeyMergeOnWrite) {
throw new AnalysisException("Cluster keys only support
unique keys table which enabled "
+
PropertyAnalyzer.ENABLE_UNIQUE_KEY_MERGE_ON_WRITE);
@@ -503,7 +500,7 @@ public class CreateTableStmt extends DdlStmt implements
NotFallbackInParser {
columnDef.getType().getPrimitiveType() + " column
can't support aggregation "
+ columnDef.getAggregateType());
}
- if (columnDef.isKey()) {
+ if (columnDef.isKey() || columnDef.getClusterKeyId() != -1) {
throw new
AnalysisException(columnDef.getType().getPrimitiveType()
+ " can only be used in the non-key column of the
duplicate table at present.");
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/KeysDesc.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/KeysDesc.java
index e7359657ef2..0076ce74de3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/KeysDesc.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/KeysDesc.java
@@ -34,7 +34,6 @@ public class KeysDesc implements Writable {
private KeysType type;
private List<String> keysColumnNames;
private List<String> clusterKeysColumnNames;
- private List<Integer> clusterKeysColumnIds = null;
public KeysDesc() {
this.type = KeysType.AGG_KEYS;
@@ -51,12 +50,6 @@ public class KeysDesc implements Writable {
this.clusterKeysColumnNames = clusterKeyColumnNames;
}
- public KeysDesc(KeysType type, List<String> keysColumnNames, List<String>
clusterKeyColumnNames,
- List<Integer> clusterKeysColumnIds) {
- this(type, keysColumnNames, clusterKeyColumnNames);
- this.clusterKeysColumnIds = clusterKeysColumnIds;
- }
-
public KeysType getKeysType() {
return type;
}
@@ -69,10 +62,6 @@ public class KeysDesc implements Writable {
return clusterKeysColumnNames;
}
- public List<Integer> getClusterKeysColumnIds() {
- return clusterKeysColumnIds;
- }
-
public boolean containsCol(String colName) {
return keysColumnNames.contains(colName);
}
@@ -90,17 +79,6 @@ public class KeysDesc implements Writable {
throw new AnalysisException("The number of key columns should be
less than the number of columns.");
}
- if (clusterKeysColumnNames != null) {
- if (Config.isCloudMode()) {
- throw new AnalysisException("Cluster key is not supported in
cloud mode");
- }
- if (type != KeysType.UNIQUE_KEYS) {
- throw new AnalysisException("Cluster keys only support unique
keys table.");
- }
- clusterKeysColumnIds = Lists.newArrayList();
- analyzeClusterKeys(cols);
- }
-
for (int i = 0; i < keysColumnNames.size(); ++i) {
String name = cols.get(i).getName();
if (!keysColumnNames.get(i).equalsIgnoreCase(name)) {
@@ -135,39 +113,48 @@ public class KeysDesc implements Writable {
}
if (clusterKeysColumnNames != null) {
- int minKeySize = keysColumnNames.size() <
clusterKeysColumnNames.size() ? keysColumnNames.size()
- : clusterKeysColumnNames.size();
- boolean sameKey = true;
- for (int i = 0; i < minKeySize; ++i) {
- if
(!keysColumnNames.get(i).equalsIgnoreCase(clusterKeysColumnNames.get(i))) {
- sameKey = false;
- break;
- }
- }
- if (sameKey) {
- throw new AnalysisException("Unique keys and cluster keys
should be different.");
- }
+ analyzeClusterKeys(cols);
}
}
private void analyzeClusterKeys(List<ColumnDef> cols) throws
AnalysisException {
- for (int i = 0; i < clusterKeysColumnNames.size(); ++i) {
+ if (Config.isCloudMode()) {
+ throw new AnalysisException("Cluster key is not supported in cloud
mode");
+ }
+ if (type != KeysType.UNIQUE_KEYS) {
+ throw new AnalysisException("Cluster keys only support unique keys
table");
+ }
+ // check that cluster keys is not duplicated
+ for (int i = 0; i < clusterKeysColumnNames.size(); i++) {
String name = clusterKeysColumnNames.get(i);
- // check if key is duplicate
for (int j = 0; j < i; j++) {
if (clusterKeysColumnNames.get(j).equalsIgnoreCase(name)) {
throw new AnalysisException("Duplicate cluster key
column[" + name + "].");
}
}
- // check if key exists and generate key column ids
+ }
+ // check that cluster keys is not equal to primary keys
+ int minKeySize = Math.min(keysColumnNames.size(),
clusterKeysColumnNames.size());
+ boolean sameKey = true;
+ for (int i = 0; i < minKeySize; i++) {
+ if
(!keysColumnNames.get(i).equalsIgnoreCase(clusterKeysColumnNames.get(i))) {
+ sameKey = false;
+ break;
+ }
+ }
+ if (sameKey) {
+ throw new AnalysisException("Unique keys and cluster keys should
be different.");
+ }
+ // check that cluster key column exists
+ for (int i = 0; i < clusterKeysColumnNames.size(); i++) {
+ String name = clusterKeysColumnNames.get(i);
for (int j = 0; j < cols.size(); j++) {
if (cols.get(j).getName().equalsIgnoreCase(name)) {
- cols.get(j).setClusterKeyId(clusterKeysColumnIds.size());
- clusterKeysColumnIds.add(j);
+ cols.get(j).setClusterKeyId(i);
break;
}
if (j == cols.size() - 1) {
- throw new AnalysisException("Key cluster column[" + name +
"] doesn't exist.");
+ throw new AnalysisException("Cluster key column[" + name +
"] doesn't exist.");
}
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
index 13a6d3a8051..27ad19e1762 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
@@ -100,6 +100,7 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Table.Cell;
import com.google.gson.annotations.SerializedName;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -1239,6 +1240,10 @@ public class RestoreJob extends AbstractJob implements
GsonPostProcessable {
MaterializedIndexMeta indexMeta =
localTbl.getIndexMetaByIndexId(restoredIdx.getId());
List<Index> indexes = restoredIdx.getId() ==
localTbl.getBaseIndexId()
? localTbl.getCopiedIndexes() : null;
+ List<Integer> clusterKeyIndexes = null;
+ if (indexMeta.getIndexId() == localTbl.getBaseIndexId() ||
localTbl.isShadowIndex(indexMeta.getIndexId())) {
+ clusterKeyIndexes =
OlapTable.getClusterKeyIndexes(indexMeta.getSchema());
+ }
for (Tablet restoreTablet : restoredIdx.getTablets()) {
TabletMeta tabletMeta = new TabletMeta(db.getId(),
localTbl.getId(), restorePart.getId(),
restoredIdx.getId(), indexMeta.getSchemaHash(),
TStorageMedium.HDD);
@@ -1282,6 +1287,12 @@ public class RestoreJob extends AbstractJob implements
GsonPostProcessable {
LOG.info("set base tablet {} for replica {} in restore
job {}, tablet id={}",
baseTablet.first, restoreReplica.getId(),
jobId, restoreTablet.getId());
}
+ if (!CollectionUtils.isEmpty(clusterKeyIndexes)) {
+ task.setClusterKeyIndexes(clusterKeyIndexes);
+ LOG.info("table: {}, partition: {}, index: {}, tablet:
{}, cluster key indexes: {}",
+ localTbl.getId(), restorePart.getId(),
restoredIdx.getId(), restoreTablet.getId(),
+ clusterKeyIndexes);
+ }
batchTask.addTask(task);
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
index 20737d9a035..9728a9e4154 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -110,6 +110,7 @@ import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
+import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
@@ -3131,6 +3132,16 @@ public class OlapTable extends Table implements
MTMVRelatedTableIf, GsonPostProc
}
}
+ public static List<Integer> getClusterKeyIndexes(List<Column> columns) {
+ Map<Integer, Integer> clusterKeyIndexes = new TreeMap<>();
+ for (Column column : columns) {
+ if (column.isClusterKey()) {
+ clusterKeyIndexes.put(column.getClusterKeyId(),
column.getUniqueId());
+ }
+ }
+ return clusterKeyIndexes.isEmpty() ? null : new
ArrayList<>(clusterKeyIndexes.values());
+ }
+
public long getVisibleVersionTime() {
return tableAttributes.getVisibleVersionTime();
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
index c1c58f7b898..78044f2190d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
@@ -101,8 +101,7 @@ public class CloudInternalCatalog extends InternalCatalog {
String storagePolicy,
IdGeneratorBuffer
idGeneratorBuffer,
BinlogConfig binlogConfig,
- boolean
isStorageMediumSpecified,
- List<Integer>
clusterKeyIndexes)
+ boolean
isStorageMediumSpecified)
throws DdlException {
// create base index first.
Preconditions.checkArgument(tbl.getBaseIndexId() != -1);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
index 77fe701f204..03c33a21e94 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
@@ -1770,8 +1770,7 @@ public class InternalCatalog implements
CatalogIf<Database> {
singlePartitionDesc.isInMemory(),
singlePartitionDesc.getTabletType(),
storagePolicy, idGeneratorBuffer,
- binlogConfig, dataProperty.isStorageMediumSpecified(),
null);
- // TODO cluster key ids
+ binlogConfig, dataProperty.isStorageMediumSpecified());
// check again
olapTable = db.getOlapTableOrDdlException(tableName);
@@ -2086,8 +2085,7 @@ public class InternalCatalog implements
CatalogIf<Database> {
String storagePolicy,
IdGeneratorBuffer
idGeneratorBuffer,
BinlogConfig binlogConfig,
- boolean
isStorageMediumSpecified,
- List<Integer>
clusterKeyIndexes)
+ boolean
isStorageMediumSpecified)
throws DdlException {
// create base index first.
Preconditions.checkArgument(tbl.getBaseIndexId() != -1);
@@ -2145,6 +2143,11 @@ public class InternalCatalog implements
CatalogIf<Database> {
short shortKeyColumnCount = indexMeta.getShortKeyColumnCount();
TStorageType storageType = indexMeta.getStorageType();
List<Column> schema = indexMeta.getSchema();
+ List<Integer> clusterKeyIndexes = null;
+ if (indexId == tbl.getBaseIndexId()) {
+ // only base and shadow index need cluster key indexes
+ clusterKeyIndexes = OlapTable.getClusterKeyIndexes(schema);
+ }
KeysType keysType = indexMeta.getKeysType();
List<Index> indexes = indexId == tbl.getBaseIndexId() ?
tbl.getCopiedIndexes() : null;
int totalTaskNum = index.getTablets().size() * totalReplicaNum;
@@ -2176,7 +2179,11 @@ public class InternalCatalog implements
CatalogIf<Database> {
task.setStorageFormat(tbl.getStorageFormat());
task.setInvertedIndexFileStorageFormat(tbl.getInvertedIndexFileStorageFormat());
- task.setClusterKeyIndexes(clusterKeyIndexes);
+ if (!CollectionUtils.isEmpty(clusterKeyIndexes)) {
+ task.setClusterKeyIndexes(clusterKeyIndexes);
+ LOG.info("table: {}, partition: {}, index: {}, tablet:
{}, cluster key indexes: {}",
+ tbl.getId(), partitionId, indexId, tabletId,
clusterKeyIndexes);
+ }
batchTask.addTask(task);
// add to AgentTaskQueue for handling finish report.
// not for resending task
@@ -2649,8 +2656,8 @@ public class InternalCatalog implements
CatalogIf<Database> {
olapTable.setRowStorePageSize(rowStorePageSize);
// check data sort properties
- int keyColumnSize =
CollectionUtils.isEmpty(keysDesc.getClusterKeysColumnIds()) ?
keysDesc.keysColumnSize() :
- keysDesc.getClusterKeysColumnIds().size();
+ int keyColumnSize =
CollectionUtils.isEmpty(keysDesc.getClusterKeysColumnNames()) ?
keysDesc.keysColumnSize() :
+ keysDesc.getClusterKeysColumnNames().size();
DataSortInfo dataSortInfo =
PropertyAnalyzer.analyzeDataSortInfo(properties, keysType,
keyColumnSize, storageFormat);
olapTable.setDataSortInfo(dataSortInfo);
@@ -2662,6 +2669,10 @@ public class InternalCatalog implements
CatalogIf<Database> {
} catch (AnalysisException e) {
throw new DdlException(e.getMessage());
}
+ if (enableUniqueKeyMergeOnWrite && !enableLightSchemaChange &&
!CollectionUtils.isEmpty(
+ keysDesc.getClusterKeysColumnNames())) {
+ throw new DdlException("Unique merge-on-write table with
cluster keys must enable light schema change");
+ }
}
olapTable.setEnableUniqueKeyMergeOnWrite(enableUniqueKeyMergeOnWrite);
@@ -2990,18 +3001,15 @@ public class InternalCatalog implements
CatalogIf<Database> {
throw new DdlException(e.getMessage());
}
- // analyse group commit interval ms
- int groupCommitIntervalMs;
try {
- groupCommitIntervalMs =
PropertyAnalyzer.analyzeGroupCommitIntervalMs(properties);
+ int groupCommitIntervalMs =
PropertyAnalyzer.analyzeGroupCommitIntervalMs(properties);
olapTable.setGroupCommitIntervalMs(groupCommitIntervalMs);
} catch (Exception e) {
throw new DdlException(e.getMessage());
}
- int groupCommitDataBytes;
try {
- groupCommitDataBytes =
PropertyAnalyzer.analyzeGroupCommitDataBytes(properties);
+ int groupCommitDataBytes =
PropertyAnalyzer.analyzeGroupCommitDataBytes(properties);
olapTable.setGroupCommitDataBytes(groupCommitDataBytes);
} catch (Exception e) {
throw new DdlException(e.getMessage());
@@ -3057,8 +3065,7 @@ public class InternalCatalog implements
CatalogIf<Database> {
storagePolicy,
idGeneratorBuffer,
binlogConfigForTask,
-
partitionInfo.getDataProperty(partitionId).isStorageMediumSpecified(),
- keysDesc.getClusterKeysColumnIds());
+
partitionInfo.getDataProperty(partitionId).isStorageMediumSpecified());
afterCreatePartitions(db.getId(), olapTable.getId(), null,
olapTable.getIndexIdList(), true);
olapTable.addPartition(partition);
@@ -3142,8 +3149,7 @@ public class InternalCatalog implements
CatalogIf<Database> {
partitionInfo.getTabletType(entry.getValue()),
partionStoragePolicy, idGeneratorBuffer,
binlogConfigForTask,
- dataProperty.isStorageMediumSpecified(),
- keysDesc.getClusterKeysColumnIds());
+ dataProperty.isStorageMediumSpecified());
olapTable.addPartition(partition);
olapTable.getPartitionInfo().getDataProperty(partition.getId())
.setStoragePolicy(partionStoragePolicy);
@@ -3566,14 +3572,6 @@ public class InternalCatalog implements
CatalogIf<Database> {
Env.getCurrentInvertedIndex().deleteTablet(tabletId);
}
};
- Map<Integer, Integer> clusterKeyMap = new TreeMap<>();
- for (int i = 0; i < olapTable.getBaseSchema().size(); i++) {
- Column column = olapTable.getBaseSchema().get(i);
- if (column.getClusterKeyId() != -1) {
- clusterKeyMap.put(column.getClusterKeyId(), i);
- }
- }
- List<Integer> clusterKeyIdxes =
clusterKeyMap.values().stream().collect(Collectors.toList());
try {
long bufferSize =
IdGeneratorUtil.getBufferSizeForTruncateTable(copiedTbl,
origPartitions.values());
IdGeneratorBuffer idGeneratorBuffer =
@@ -3609,8 +3607,7 @@ public class InternalCatalog implements
CatalogIf<Database> {
copiedTbl.getPartitionInfo().getTabletType(oldPartitionId),
olapTable.getPartitionInfo().getDataProperty(oldPartitionId).getStoragePolicy(),
idGeneratorBuffer, binlogConfig,
-
copiedTbl.getPartitionInfo().getDataProperty(oldPartitionId).isStorageMediumSpecified(),
- clusterKeyIdxes);
+
copiedTbl.getPartitionInfo().getDataProperty(oldPartitionId).isStorageMediumSpecified());
newPartitions.add(newPartition);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
index f7702a49554..a4a5273e8ea 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
@@ -96,6 +96,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
import com.google.common.collect.Sets;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -964,10 +965,19 @@ public class ReportHandler extends Daemon {
objectPool,
olapTable.rowStorePageSize(),
olapTable.variantEnableFlattenNested());
-
createReplicaTask.setIsRecoverTask(true);
createReplicaTask.setInvertedIndexFileStorageFormat(olapTable
.getInvertedIndexFileStorageFormat());
+ if (indexId == olapTable.getBaseIndexId()
|| olapTable.isShadowIndex(indexId)) {
+ List<Integer> clusterKeyIndexes =
OlapTable.getClusterKeyIndexes(
+ indexMeta.getSchema());
+ if
(!CollectionUtils.isEmpty(clusterKeyIndexes)) {
+
createReplicaTask.setClusterKeyIndexes(clusterKeyIndexes);
+ LOG.info("table: {}, partition:
{}, index: {}, tablet: {}, "
+ + "cluster key
indexes: {}", tableId, partitionId, indexId,
+ tabletId,
clusterKeyIndexes);
+ }
+ }
createReplicaBatchTask.addTask(createReplicaTask);
} else {
// just set this replica as bad
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateTableInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateTableInfo.java
index 730c6f115a3..04ce3786bb6 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateTableInfo.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateTableInfo.java
@@ -131,7 +131,6 @@ public class CreateTableInfo {
private boolean isExternal = false;
private String clusterName = null;
private List<String> clusterKeysColumnNames = null;
- private List<Integer> clusterKeysColumnIds = null;
private PartitionTableInfo partitionTableInfo; // get when validate
/**
@@ -424,9 +423,6 @@ public class CreateTableInfo {
validateKeyColumns();
if (!clusterKeysColumnNames.isEmpty()) {
- if (Config.isCloudMode()) {
- throw new AnalysisException("Cluster key is not supported
in cloud mode");
- }
if (!isEnableMergeOnWrite) {
throw new AnalysisException(
"Cluster keys only support unique keys table which
enabled "
@@ -736,50 +732,6 @@ public class CreateTableInfo {
"The number of key columns should be less than the number
of columns.");
}
- if (!clusterKeysColumnNames.isEmpty()) {
- if (Config.isCloudMode()) {
- throw new AnalysisException("Cluster key is not supported in
cloud mode");
- }
- if (keysType != KeysType.UNIQUE_KEYS) {
- throw new AnalysisException("Cluster keys only support unique
keys table.");
- }
- clusterKeysColumnIds = Lists.newArrayList();
- for (int i = 0; i < clusterKeysColumnNames.size(); ++i) {
- String name = clusterKeysColumnNames.get(i);
- // check if key is duplicate
- for (int j = 0; j < i; j++) {
- if (clusterKeysColumnNames.get(j).equalsIgnoreCase(name)) {
- throw new AnalysisException("Duplicate cluster key
column[" + name + "].");
- }
- }
- // check if key exists and generate key column ids
- for (int j = 0; j < columns.size(); j++) {
- if (columns.get(j).getName().equalsIgnoreCase(name)) {
-
columns.get(j).setClusterKeyId(clusterKeysColumnIds.size());
- clusterKeysColumnIds.add(j);
- break;
- }
- if (j == columns.size() - 1) {
- throw new AnalysisException(
- "Key cluster column[" + name + "] doesn't
exist.");
- }
- }
- }
-
- int minKeySize = keys.size() < clusterKeysColumnNames.size() ?
keys.size()
- : clusterKeysColumnNames.size();
- boolean sameKey = true;
- for (int i = 0; i < minKeySize; ++i) {
- if
(!keys.get(i).equalsIgnoreCase(clusterKeysColumnNames.get(i))) {
- sameKey = false;
- break;
- }
- }
- if (sameKey) {
- throw new AnalysisException("Unique keys and cluster keys
should be different.");
- }
- }
-
for (int i = 0; i < keys.size(); ++i) {
String name = columns.get(i).getName();
if (!keys.get(i).equalsIgnoreCase(name)) {
@@ -815,6 +767,50 @@ public class CreateTableInfo {
}
}
}
+
+ if (!clusterKeysColumnNames.isEmpty()) {
+ // the same code as KeysDesc#analyzeClusterKeys
+ if (Config.isCloudMode()) {
+ throw new AnalysisException("Cluster key is not supported in
cloud mode");
+ }
+ if (keysType != KeysType.UNIQUE_KEYS) {
+ throw new AnalysisException("Cluster keys only support unique
keys table");
+ }
+ // check that cluster keys is not duplicated
+ for (int i = 0; i < clusterKeysColumnNames.size(); i++) {
+ String name = clusterKeysColumnNames.get(i);
+ for (int j = 0; j < i; j++) {
+ if (clusterKeysColumnNames.get(j).equalsIgnoreCase(name)) {
+ throw new AnalysisException("Duplicate cluster key
column[" + name + "].");
+ }
+ }
+ }
+ // check that cluster keys is not equal to primary keys
+ int minKeySize = Math.min(keys.size(),
clusterKeysColumnNames.size());
+ boolean sameKey = true;
+ for (int i = 0; i < minKeySize; ++i) {
+ if
(!keys.get(i).equalsIgnoreCase(clusterKeysColumnNames.get(i))) {
+ sameKey = false;
+ break;
+ }
+ }
+ if (sameKey) {
+ throw new AnalysisException("Unique keys and cluster keys
should be different.");
+ }
+ // check that cluster key column exists
+ for (int i = 0; i < clusterKeysColumnNames.size(); ++i) {
+ String name = clusterKeysColumnNames.get(i);
+ for (int j = 0; j < columns.size(); j++) {
+ if (columns.get(j).getName().equalsIgnoreCase(name)) {
+ columns.get(j).setClusterKeyId(i);
+ break;
+ }
+ if (j == columns.size() - 1) {
+ throw new AnalysisException("Cluster key column[" +
name + "] doesn't exist.");
+ }
+ }
+ }
+ }
}
/**
@@ -858,7 +854,7 @@ public class CreateTableInfo {
return new CreateTableStmt(ifNotExists, isExternal,
new TableName(ctlName, dbName, tableName),
catalogColumns, catalogIndexes, engineName,
- new KeysDesc(keysType, keys, clusterKeysColumnNames,
clusterKeysColumnIds),
+ new KeysDesc(keysType, keys, clusterKeysColumnNames),
partitionDesc, distributionDesc, Maps.newHashMap(properties),
extProperties,
comment, addRollups, null);
}
diff --git
a/regression-test/data/unique_with_mow_c_p0/test_schema_change_ck.out
b/regression-test/data/unique_with_mow_c_p0/test_schema_change_ck.out
new file mode 100644
index 00000000000..50028960ab1
--- /dev/null
+++ b/regression-test/data/unique_with_mow_c_p0/test_schema_change_ck.out
@@ -0,0 +1,365 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !select_original --
+11 28 38
+10 29 39
+
+-- !select_add_c4 --
+11 28 38 \N
+10 29 39 \N
+13 27 36 40
+12 26 37 40
+
+-- !select_add_c5 --
+11 \N 28 38 \N
+10 \N 29 39 \N
+13 \N 27 36 40
+12 \N 26 37 40
+15 50 20 34 40
+14 50 20 35 40
+
+-- !select_add_c6 --
+11 \N 28 \N 38 \N
+10 \N 29 \N 39 \N
+13 \N 27 \N 36 40
+12 \N 26 \N 37 40
+15 50 20 \N 34 40
+14 50 20 \N 35 40
+17 50 20 60 32 40
+16 50 20 60 33 40
+
+-- !select_add_k2 --
+11 \N \N 28 \N 38 \N
+10 \N \N 29 \N 39 \N
+13 \N \N 27 \N 36 40
+12 \N \N 26 \N 37 40
+15 \N 50 20 \N 34 40
+14 \N 50 20 \N 35 40
+17 \N 50 20 60 32 40
+16 \N 50 20 60 33 40
+19 200 \N 20 \N 30 \N
+18 200 \N 20 \N 31 \N
+
+-- !select_drop_c4 --
+11 \N \N 28 \N 38
+10 \N \N 29 \N 39
+13 \N \N 27 \N 36
+12 \N \N 26 \N 37
+15 \N 50 20 \N 34
+14 \N 50 20 \N 35
+17 \N 50 20 60 32
+16 \N 50 20 60 33
+19 200 \N 20 \N 30
+18 200 \N 20 \N 31
+119 200 \N 20 \N 30
+118 200 \N 20 \N 31
+
+-- !select_drop_c5 --
+11 \N 28 \N 38
+10 \N 29 \N 39
+13 \N 27 \N 36
+12 \N 26 \N 37
+15 \N 20 \N 34
+14 \N 20 \N 35
+17 \N 20 60 32
+16 \N 20 60 33
+19 200 20 \N 30
+18 200 20 \N 31
+119 200 20 \N 30
+118 200 20 \N 31
+117 200 20 \N 32
+116 200 20 \N 33
+
+-- !select_drop_c6 --
+11 \N 28 38
+10 \N 29 39
+13 \N 27 36
+12 \N 26 37
+15 \N 20 34
+14 \N 20 35
+17 \N 20 32
+16 \N 20 33
+19 200 20 30
+18 200 20 31
+119 200 20 30
+118 200 20 31
+117 200 20 32
+116 200 20 33
+115 200 25 34
+114 200 24 35
+
+-- !select_reorder --
+11 \N 38 28
+10 \N 39 29
+13 \N 36 27
+12 \N 37 26
+15 \N 34 20
+14 \N 35 20
+17 \N 32 20
+16 \N 33 20
+19 200 30 20
+18 200 31 20
+119 200 30 20
+118 200 31 20
+117 200 32 20
+116 200 33 20
+115 200 34 25
+114 200 35 24
+113 200 36 23
+112 200 37 22
+
+-- !select_modify_k2 --
+11 \N 38 28
+10 \N 39 29
+13 \N 36 27
+12 \N 37 26
+15 \N 34 20
+14 \N 35 20
+17 \N 32 20
+16 \N 33 20
+19 200 30 20
+18 200 31 20
+119 200 30 20
+118 200 31 20
+117 200 32 20
+116 200 33 20
+115 200 34 25
+114 200 35 24
+113 200 36 23
+112 200 37 22
+111 200 38 21
+110 200 39 20
+
+-- !select_create_mv_base --
+11 \N 38 28
+10 \N 39 29
+13 \N 36 27
+12 \N 37 26
+15 \N 34 20
+14 \N 35 20
+17 \N 32 20
+16 \N 33 20
+19 200 30 20
+18 200 31 20
+119 200 30 20
+118 200 31 20
+117 200 32 20
+116 200 33 20
+115 200 34 25
+114 200 35 24
+113 200 36 23
+112 200 37 22
+111 200 38 21
+110 200 39 20
+211 200 38 21
+210 200 39 20
+
+-- !select_create_mv_mv --
+10 39
+11 38
+12 37
+13 36
+14 35
+15 34
+16 33
+17 32
+18 31
+19 30
+118 31
+119 30
+116 33
+117 32
+114 35
+115 34
+112 37
+113 36
+110 39
+111 38
+210 39
+211 38
+
+-- !select_create_rollup_base --
+11 \N 38 28
+10 \N 39 29
+13 \N 36 27
+12 \N 37 26
+15 \N 34 20
+14 \N 35 20
+17 \N 32 20
+16 \N 33 20
+19 200 30 20
+18 200 31 20
+119 200 30 20
+118 200 31 20
+117 200 32 20
+116 200 33 20
+115 200 34 25
+114 200 35 24
+113 200 36 23
+112 200 37 22
+111 200 38 21
+110 200 39 20
+211 200 38 21
+210 200 39 20
+311 200 38 21
+310 200 39 20
+
+-- !select_create_rollup_roll --
+\N 10 29
+\N 11 28
+\N 12 26
+\N 13 27
+\N 14 20
+\N 15 20
+\N 16 20
+\N 17 20
+200 18 20
+200 19 20
+200 118 20
+200 119 20
+200 116 20
+200 117 20
+200 114 24
+200 115 25
+200 112 22
+200 113 23
+200 110 20
+200 111 21
+200 210 20
+200 211 21
+200 310 20
+200 311 21
+
+-- !select_add_partition --
+10011 200 38 21
+10010 200 39 20
+11 \N 38 28
+10 \N 39 29
+13 \N 36 27
+12 \N 37 26
+15 \N 34 20
+14 \N 35 20
+17 \N 32 20
+16 \N 33 20
+19 200 30 20
+18 200 31 20
+119 200 30 20
+118 200 31 20
+117 200 32 20
+116 200 33 20
+115 200 34 25
+114 200 35 24
+113 200 36 23
+112 200 37 22
+111 200 38 21
+110 200 39 20
+211 200 38 21
+210 200 39 20
+311 200 38 21
+310 200 39 20
+
+-- !select_truncate --
+13 \N 36 27
+12 \N 37 26
+11 \N 38 28
+10 \N 39 29
+
+-- !select_rollup_base --
+12 22 31 41 51
+11 21 32 42 52
+
+-- !select_rollup_roll --
+21 11 42 32
+22 12 41 31
+
+-- !select_rollup_base_sc --
+12 22 31 41 51
+11 21 32 42 52
+
+-- !select_rollup_roll_sc --
+21 11 42 32
+22 12 41 31
+
+-- !select_rollup_base_sc1 --
+12 22 31 41 51
+11 21 32 42 52
+14 24 33 43 53
+13 23 34 44 54
+
+-- !select_rollup_roll_sc1 --
+21 11 42 32
+22 12 41 31
+23 13 44 34
+24 14 43 33
+
+-- !select_restore_base2 --
+12 22 31 41 51
+11 21 32 42 52
+14 24 33 43 53
+13 23 34 44 54
+16 26 33 43 53
+15 25 34 44 54
+
+-- !select_restore_roll2 --
+21 11 42 32
+22 12 41 31
+23 13 44 34
+24 14 43 33
+25 15 44 34
+26 16 43 33
+
+-- !select_restore_base --
+12 22 31 41 51
+11 21 32 42 52
+14 24 33 43 53
+13 23 34 44 54
+
+-- !select_restore_roll --
+21 11 42 32
+22 12 41 31
+23 13 44 34
+24 14 43 33
+
+-- !select_restore_base1 --
+12 22 31 41 51
+11 21 32 42 52
+14 24 33 43 53
+13 23 34 44 54
+18 28 33 43 53
+17 27 34 44 54
+
+-- !select_restore_roll1 --
+21 11 42 32
+22 12 41 31
+23 13 44 34
+24 14 43 33
+27 17 44 34
+28 18 43 33
+
+-- !select_restore_base2 --
+12 22 31 41 51
+11 21 32 42 52
+14 24 33 43 53
+13 23 34 44 54
+
+-- !select_restore_roll2 --
+21 11 42 32
+22 12 41 31
+23 13 44 34
+24 14 43 33
+
+-- !select_restore_base3 --
+12 22 31 41 51
+11 21 32 42 52
+14 24 33 43 53
+13 23 34 44 54
+18 28 33 43 53
+17 27 34 44 54
+
+-- !select_restore_roll4 --
+21 11 42 32
+22 12 41 31
+23 13 44 34
+24 14 43 33
+27 17 44 34
+28 18 43 33
+
diff --git
a/regression-test/suites/unique_with_mow_c_p0/test_create_table.groovy
b/regression-test/suites/unique_with_mow_c_p0/test_create_table.groovy
index 8cd7cb6d198..0abb7d8f1a9 100644
--- a/regression-test/suites/unique_with_mow_c_p0/test_create_table.groovy
+++ b/regression-test/suites/unique_with_mow_c_p0/test_create_table.groovy
@@ -81,7 +81,7 @@ suite("test_create_table") {
"enable_unique_key_merge_on_write" = "true"
);
"""
- exception "Key cluster column[c_addresses] doesn't exist"
+ exception "Cluster key column[c_addresses] doesn't exist"
}
// mow unique table with duplicate cluster keys
diff --git
a/regression-test/suites/unique_with_mow_c_p0/test_schema_change.groovy
b/regression-test/suites/unique_with_mow_c_p0/test_schema_change.groovy
index 9abee82f7c0..37c96e79a6b 100644
--- a/regression-test/suites/unique_with_mow_c_p0/test_schema_change.groovy
+++ b/regression-test/suites/unique_with_mow_c_p0/test_schema_change.groovy
@@ -48,7 +48,7 @@ suite("test_schema_change") {
`min_dwell_time` INT DEFAULT "99999" COMMENT "用户最小停留时间")
UNIQUE KEY(`user_id`, `date`, `city`, `age`, `sex`)
CLUSTER BY(`cost`, `comment`)
- DISTRIBUTED BY HASH(`user_id`)
+ DISTRIBUTED BY HASH(`user_id`) BUCKETS 1
PROPERTIES ( "replication_num" = "1",
"enable_unique_key_merge_on_write" = "true"
);
@@ -237,12 +237,12 @@ suite("test_schema_change") {
}
// 5. modify column order should success (Temporarily throw exception)
- test {
+ /*test {
sql """
alter table ${tableName} ORDER BY (`user_id`, `date`, `city`,
`age`, `sex`, `max_dwell_time`, `comment`, `min_dwell_time`,
`last_visit_date_not_null`, `cost`, `score`, `last_update_date`);
"""
exception "Can not modify column order in Unique data model table"
- }
+ }*/
/*assertTrue(getAlterTableState(), "alter column order should success");
{
sql """ INSERT INTO ${tableName}
diff --git
a/regression-test/suites/unique_with_mow_c_p0/test_schema_change_ck.groovy
b/regression-test/suites/unique_with_mow_c_p0/test_schema_change_ck.groovy
new file mode 100644
index 00000000000..840badb6310
--- /dev/null
+++ b/regression-test/suites/unique_with_mow_c_p0/test_schema_change_ck.groovy
@@ -0,0 +1,262 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+
+suite("test_schema_change_ck") {
+ def db = "regression_test_unique_with_mow_c_p0"
+ def tableName = "test_schema_change_ck"
+
+ def getAlterTableState = {
+ waitForSchemaChangeDone {
+ sql """ SHOW ALTER TABLE COLUMN WHERE tablename='${tableName}'
ORDER BY createtime DESC LIMIT 1 """
+ time 600
+ }
+ return true
+ }
+
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ test {
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ `c1` int(11) NULL,
+ `c2` int(11) NULL,
+ `c3` int(11) NULL
+ ) unique KEY(`c1`)
+ cluster by(`c3`, `c2`)
+ DISTRIBUTED BY HASH(`c1`) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1",
+ "disable_auto_compaction" = "true",
+ "light_schema_change" = "false"
+ );
+ """
+ exception "Unique merge-on-write table with cluster keys must enable
light schema change"
+ }
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ `c1` int(11) NULL,
+ `c2` int(11) NULL,
+ `c3` int(11) NULL
+ ) unique KEY(`c1`)
+ cluster by(`c3`, `c2`)
+ PARTITION BY RANGE(`c1`)
+ (
+ PARTITION `p_10000` VALUES [("0"), ("10000"))
+ )
+ DISTRIBUTED BY HASH(`c1`) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1",
+ "disable_auto_compaction" = "true"
+ );
+ """
+
+ sql """ INSERT INTO ${tableName} VALUES (11, 28, 38), (10, 29, 39) """
+ qt_select_original """select * from ${tableName}"""
+
+ /****** add value column ******/
+ // after cluster key
+ sql """ alter table ${tableName} ADD column c4 int(11) after c3; """
+ assertTrue(getAlterTableState(), "add column should success")
+ sql """ INSERT INTO ${tableName}(c1, c2, c3, c4) VALUES (13, 27, 36, 40),
(12, 26, 37, 40) """
+ qt_select_add_c4 """select * from ${tableName}"""
+
+ // before cluster key
+ sql """ alter table ${tableName} ADD column c5 int(11) after c1; """
+ assertTrue(getAlterTableState(), "add column should success")
+ sql """ INSERT INTO ${tableName}(c1, c2, c3, c4, c5) VALUES (15, 20, 34,
40, 50), (14, 20, 35, 40, 50) """
+ qt_select_add_c5 """select * from ${tableName}"""
+
+ // in the middle of cluster key
+ sql """ alter table ${tableName} ADD column c6 int(11) after c2; """
+ assertTrue(getAlterTableState(), "add column should success")
+ sql """ INSERT INTO ${tableName}(c1, c2, c3, c4, c5, c6) VALUES (17, 20,
32, 40, 50, 60), (16, 20, 33, 40, 50, 60) """
+ qt_select_add_c6 """select * from ${tableName}"""
+
+ /****** add key column ******/
+ sql """ alter table ${tableName} ADD column k2 int(11) key after c1; """
+ assertTrue(getAlterTableState(), "add column should success")
+ sql """ INSERT INTO ${tableName}(c1, c2, c3, k2) VALUES (19, 20, 30, 200),
(18, 20, 31, 200) """
+ qt_select_add_k2 """select * from ${tableName}"""
+
+ /****** TODO add cluster key column is not supported ******/
+
+ /****** drop value column ******/
+ sql """ alter table ${tableName} drop column c4; """
+ assertTrue(getAlterTableState(), "drop column should success")
+ sql """ INSERT INTO ${tableName}(c1, c2, c3, k2) VALUES (119, 20, 30,
200), (118, 20, 31, 200) """
+ qt_select_drop_c4 """select * from ${tableName}"""
+
+ sql """ alter table ${tableName} drop column c5; """
+ assertTrue(getAlterTableState(), "drop column should success")
+ sql """ INSERT INTO ${tableName}(c1, c2, c3, k2) VALUES (117, 20, 32,
200), (116, 20, 33, 200) """
+ qt_select_drop_c5 """select * from ${tableName}"""
+
+ sql """ alter table ${tableName} drop column c6; """
+ assertTrue(getAlterTableState(), "drop column should success")
+ sql """ INSERT INTO ${tableName}(c1, c2, c3, k2) VALUES (115, 25, 34,
200), (114, 24, 35, 200) """
+ qt_select_drop_c6 """select * from ${tableName}"""
+
+ /****** drop key column ******/
+ test {
+ sql """ alter table ${tableName} drop column k2; """
+ exception "Can not drop key column in Unique data model table"
+ }
+
+ /****** TODO does not support drop cluster key ******/
+ test {
+ sql """ alter table ${tableName} drop column c3; """
+ exception "Can not drop cluster key column in Unique data model table"
+ }
+
+ /****** reorder ******/
+ sql """ alter table ${tableName} order by(c1, k2, c3, c2); """
+ assertTrue(getAlterTableState(), "reorder should success")
+ sql """ INSERT INTO ${tableName}(c1, c2, c3, k2) VALUES (113, 23, 36,
200), (112, 22, 37, 200) """
+ qt_select_reorder """select * from ${tableName}"""
+
+ /****** modify key column data type ******/
+ sql """ alter table ${tableName} modify column k2 BIGINT key; """
+ assertTrue(getAlterTableState(), "modify should success")
+ sql """ INSERT INTO ${tableName}(c1, c2, c3, k2) VALUES (111, 21, 38,
200), (110, 20, 39, 200) """
+ qt_select_modify_k2 """select * from ${tableName}"""
+
+ /****** TODO does not support modify cluster key column data type ******/
+ test {
+ sql """ alter table ${tableName} modify column c2 BIGINT; """
+ exception "Can not modify cluster key column"
+ }
+
+ /****** create mv ******/
+ def mv_name = "k2_c3"
+ sql """DROP MATERIALIZED VIEW IF EXISTS ${mv_name}"""
+ createMV """ create materialized view ${mv_name} as select c1, c3 from
${tableName}; """
+ sql """ INSERT INTO ${tableName}(c1, c2, c3, k2) VALUES (211, 21, 38,
200), (210, 20, 39, 200) """
+ qt_select_create_mv_base """select * from ${tableName}"""
+ qt_select_create_mv_mv """select c1, c3 from ${tableName}"""
+
+ /****** create rollup ******/
+ sql """ alter table ${tableName} ADD ROLLUP r1(k2, c1, c2); """
+ waitForSchemaChangeDone {
+ sql """show alter table rollup where tablename='${tableName}' order by
createtime desc limit 1"""
+ time 600
+ }
+ sql """ INSERT INTO ${tableName}(c1, c2, c3, k2) VALUES (311, 21, 38,
200), (310, 20, 39, 200) """
+ qt_select_create_rollup_base """select * from ${tableName}"""
+ qt_select_create_rollup_roll """select k2, c1, c2 from ${tableName}"""
+
+ /****** add partition ******/
+ sql "ALTER TABLE ${tableName} ADD PARTITION p_20000 VALUES [('10000'),
('20000'));"
+ for (int i = 0; i < 10; i++) {
+ List<List<Object>> partitions = sql "show partitions from
${tableName};"
+ logger.info("partitions: ${partitions}")
+ if (partitions.size() < 2 && i < 10) {
+ sleep(50)
+ continue
+ }
+ assertEquals(partitions.size(), 2)
+ }
+ sql """ INSERT INTO ${tableName}(c1, c2, c3, k2) VALUES (10011, 21, 38,
200), (10010, 20, 39, 200) """
+ qt_select_add_partition """select * from ${tableName}"""
+
+ /****** one sql contain multi column changes ******/
+
+ /****** truncate table ******/
+ sql """ TRUNCATE TABLE ${tableName} """
+ sql """ INSERT INTO ${tableName}(c1, c2, c3) VALUES (11, 28, 38), (10, 29,
39), (12, 26, 37), (13, 27, 36) """
+ qt_select_truncate """select * from ${tableName}"""
+
+ /****** create table with rollup ******/
+ tableName = tableName + "_rollup"
+ sql """ DROP TABLE IF EXISTS ${tableName}; """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ `k1` int(11) NULL,
+ `k2` int(11) NULL,
+ `c3` int(11) NULL,
+ `c4` int(11) NULL,
+ `c5` int(11) NULL
+ ) unique KEY(`k1`, `k2`)
+ cluster by(`c4`, `c5`)
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 1
+ ROLLUP (
+ r1 (k2, k1, c4, c3)
+ )
+ PROPERTIES (
+ "replication_num" = "1",
+ "disable_auto_compaction" = "true"
+ );
+ """
+ sql """ INSERT INTO ${tableName} VALUES (11, 21, 32, 42, 52), (12, 22, 31,
41, 51); """
+ qt_select_rollup_base """select * from ${tableName};"""
+ qt_select_rollup_roll """select k2, k1, c4, c3 from ${tableName};"""
+
+ /****** specify index, not base index ******/
+ sql """ ALTER TABLE ${tableName} ORDER BY(k2, k1, c3, c4) from r1; """
+ assertTrue(getAlterTableState(), "reorder rollup should success")
+ qt_select_rollup_base_sc """select * from ${tableName};"""
+ qt_select_rollup_roll_sc """select k2, k1, c4, c3 from ${tableName};"""
+ sql """ INSERT INTO ${tableName} VALUES (13, 23, 34, 44, 54), (14, 24, 33,
43, 53); """
+ qt_select_rollup_base_sc1 """select * from ${tableName};"""
+ qt_select_rollup_roll_sc1 """select k2, k1, c4, c3 from ${tableName};"""
+
+ /****** backup restore ******/
+ if (!isCloudMode()) {
+ def repoName = "repo_" + UUID.randomUUID().toString().replace("-", "")
+ def backup = tableName + "_bak"
+ def syncer = getSyncer()
+ syncer.createS3Repository(repoName)
+ def result = sql """ show tablets from ${tableName}; """
+ logger.info("tablets 0: ${result}")
+
+ // backup
+ sql """ BACKUP SNAPSHOT ${context.dbName}.${backup} TO ${repoName} ON
(${tableName}) properties("type"="full"); """
+ syncer.waitSnapshotFinish()
+ def snapshot = syncer.getSnapshotTimestamp(repoName, backup)
+ assertTrue(snapshot != null)
+ sql """ INSERT INTO ${tableName} VALUES (15, 25, 34, 44, 54), (16, 26,
33, 43, 53); """
+ qt_select_restore_base2 """select * from ${tableName};"""
+ qt_select_restore_roll2 """select k2, k1, c4, c3 from ${tableName};"""
+
+ // restore
+ logger.info(""" RESTORE SNAPSHOT ${context.dbName}.${backup} FROM
`${repoName}` ON (`${tableName}`) PROPERTIES ("backup_timestamp" =
"${snapshot}","replication_num" = "1" ) """)
+ sql """ RESTORE SNAPSHOT ${context.dbName}.${backup} FROM
`${repoName}` ON (`${tableName}`) PROPERTIES ("backup_timestamp" =
"${snapshot}","replication_num" = "1" ) """
+ syncer.waitAllRestoreFinish(context.dbName)
+ result = sql """ show tablets from ${tableName}; """
+ logger.info("tablets 1: ${result}")
+ qt_select_restore_base """select * from ${tableName};"""
+ qt_select_restore_roll """select k2, k1, c4, c3 from ${tableName};"""
+ sql """ INSERT INTO ${tableName} VALUES (17, 27, 34, 44, 54), (18, 28,
33, 43, 53); """
+ qt_select_restore_base1 """select * from ${tableName};"""
+ qt_select_restore_roll1 """select k2, k1, c4, c3 from ${tableName};"""
+
+ // restore
+ sql """ drop table ${tableName}; """
+ sql """ RESTORE SNAPSHOT ${context.dbName}.${backup} FROM
`${repoName}` ON (`${tableName}`) PROPERTIES ("backup_timestamp" =
"${snapshot}","replication_num" = "1" ) """
+ syncer.waitAllRestoreFinish(context.dbName)
+ result = sql """ show tablets from ${tableName}; """
+ logger.info("tablets 2: ${result}")
+ qt_select_restore_base2 """select * from ${tableName};"""
+ qt_select_restore_roll2 """select k2, k1, c4, c3 from ${tableName};"""
+ sql """ INSERT INTO ${tableName} VALUES (17, 27, 34, 44, 54), (18, 28,
33, 43, 53); """
+ qt_select_restore_base3 """select * from ${tableName};"""
+ qt_select_restore_roll4 """select k2, k1, c4, c3 from ${tableName};"""
+
+ sql "DROP REPOSITORY `${repoName}`"
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]