ShiKaiWi commented on code in PR #286:
URL:
https://github.com/apache/incubator-horaedb-meta/pull/286#discussion_r1461739617
##########
server/cluster/metadata/cluster_metadata.go:
##########
@@ -392,6 +398,31 @@ func (c *ClusterMetadata) CreateTable(ctx context.Context,
request CreateTableRe
return ret, nil
}
+func (c *ClusterMetadata) GetAssignTable(ctx context.Context, schemaName
string, tableName string) (storage.ShardID, bool, error) {
Review Comment:
Rename it to `GetTableAssignedShard`?
##########
server/coordinator/persist_shard_picker.go:
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package coordinator
+
+import (
+ "context"
+
+ "github.com/apache/incubator-horaedb-meta/server/cluster/metadata"
+ "github.com/apache/incubator-horaedb-meta/server/storage"
+ "github.com/pkg/errors"
+)
+
+type PersistShardPicker struct {
+ cluster *metadata.ClusterMetadata
+ internal ShardPicker
+}
+
+func NewPersistShardPicker(cluster *metadata.ClusterMetadata, internal
ShardPicker) *PersistShardPicker {
+ return &PersistShardPicker{cluster: cluster, internal: internal}
+}
+
+func (p *PersistShardPicker) PickShards(ctx context.Context, snapshot
metadata.Snapshot, schemaName string, tableNames []string)
(map[string]storage.ShardNode, error) {
+ result := map[string]storage.ShardNode{}
+
+ shardNodeMap := map[storage.ShardID]storage.ShardNode{}
+ for _, shardNode := range snapshot.Topology.ClusterView.ShardNodes {
+ shardNodeMap[shardNode.ID] = shardNode
+ }
+
+ // If table assign has been created, just reuse it.
+ for i := 0; i < len(tableNames); i++ {
+ shardID, exists, err := p.cluster.GetAssignTable(ctx,
schemaName, tableNames[i])
+ if err != nil {
+ return map[string]storage.ShardNode{}, err
+ }
+ if exists {
+ result[tableNames[i]] = shardNodeMap[shardID]
+ }
+ }
+
+ if len(result) == len(tableNames) {
+ return result, nil
+ }
+
+ if len(result) != len(tableNames) && len(result) != 0 {
Review Comment:
Maybe we should assign the new shards to the unassigned tables rather than
just returnning an error.
##########
server/cluster/metadata/topology_manager.go:
##########
@@ -44,6 +44,14 @@ type TopologyManager interface {
AddTable(ctx context.Context, shardID storage.ShardID, latestVersion
uint64, tables []storage.Table) error
// RemoveTable remove table on target shards from cluster topology.
RemoveTable(ctx context.Context, shardID storage.ShardID, latestVersion
uint64, tableIDs []storage.TableID) error
+ // GetTableShard get the shardID of the shard where the table is
located.
+ GetTableShardID(ctx context.Context, table storage.Table)
(storage.ShardID, bool)
+ // AssignTable persistent table shard mapping, it is used to store
assign results and make the table creation idempotent.
+ AssignTableToShard(ctx context.Context, schemaID storage.SchemaID,
tableName string, shardID storage.ShardID) error
+ // GetAssignTableResult get table assign result.
+ GetAssignTableShard(ctx context.Context, schemaID storage.SchemaID,
tableName string) (storage.ShardID, bool)
Review Comment:
```suggestion
GetTableAssignedShard(ctx context.Context, schemaID storage.SchemaID,
tableName string) (storage.ShardID, bool)
```
##########
server/cluster/metadata/cluster_metadata.go:
##########
@@ -392,6 +398,31 @@ func (c *ClusterMetadata) CreateTable(ctx context.Context,
request CreateTableRe
return ret, nil
}
+func (c *ClusterMetadata) GetAssignTable(ctx context.Context, schemaName
string, tableName string) (storage.ShardID, bool, error) {
+ schema, exists := c.tableManager.GetSchema(schemaName)
+ if !exists {
+ return 0, false, errors.WithMessagef(ErrSchemaNotFound, "schema
%s not found", schemaName)
+ }
+ shardIDs, exists := c.topologyManager.GetAssignTableShard(ctx,
schema.ID, tableName)
+ return shardIDs, exists, nil
+}
+
+func (c *ClusterMetadata) AssignTable(ctx context.Context, schemaName string,
tableName string, shardID storage.ShardID) error {
+ schema, exists := c.tableManager.GetSchema(schemaName)
+ if !exists {
+ return errors.WithMessagef(ErrSchemaNotFound, "schema %s not
found", schemaName)
+ }
+ return c.topologyManager.AssignTableToShard(ctx, schema.ID, tableName,
shardID)
+}
+
+func (c *ClusterMetadata) DeleteAssignTable(ctx context.Context, schemaName
string, tableName string) error {
Review Comment:
```suggestion
func (c *ClusterMetadata) DeleteTableAssignedShard(ctx context.Context,
schemaName string, tableName string) error {
```
##########
server/cluster/metadata/cluster_metadata.go:
##########
@@ -392,6 +398,31 @@ func (c *ClusterMetadata) CreateTable(ctx context.Context,
request CreateTableRe
return ret, nil
}
+func (c *ClusterMetadata) GetAssignTable(ctx context.Context, schemaName
string, tableName string) (storage.ShardID, bool, error) {
+ schema, exists := c.tableManager.GetSchema(schemaName)
+ if !exists {
+ return 0, false, errors.WithMessagef(ErrSchemaNotFound, "schema
%s not found", schemaName)
+ }
+ shardIDs, exists := c.topologyManager.GetAssignTableShard(ctx,
schema.ID, tableName)
+ return shardIDs, exists, nil
+}
+
+func (c *ClusterMetadata) AssignTable(ctx context.Context, schemaName string,
tableName string, shardID storage.ShardID) error {
Review Comment:
```suggestion
func (c *ClusterMetadata) AssignTableToShard(ctx context.Context, schemaName
string, tableName string, shardID storage.ShardID) error {
```
##########
server/etcdutil/util.go:
##########
@@ -118,11 +119,40 @@ func Scan(ctx context.Context, client *clientv3.Client,
startKey, endKey string,
}
}
- // Check whether the keys is exhausted.
+ // Check whether the keys are exhausted.
if len(resp.Kvs) < batchSize {
return nil
}
lastKeyInPrevBatch = string(resp.Kvs[len(resp.Kvs)-1].Key)
}
}
+
+func ScanWithPrefix(ctx context.Context, client *clientv3.Client, prefix
string, batchSize int, do func(key string, val []byte) error) error {
+ rangeEnd := clientv3.GetPrefixRangeEnd(prefix)
+
+ for {
+ resp, err := client.Get(ctx, prefix,
clientv3.WithRange(rangeEnd), clientv3.WithLimit(int64(batchSize)))
+ if err != nil {
+ return ErrEtcdKVGet.WithCause(err)
+ }
+ // Check whether the keys are exhausted.
+ if len(resp.Kvs) == 0 {
+ return nil
+ }
+
+ for _, item := range resp.Kvs {
+ err := do(string(item.Key), item.Value)
+ if err != nil {
+ return err
+ }
+ }
+
+ rangeEnd = string(resp.Kvs[len(resp.Kvs)-1].Key)
+ }
+}
+
+// GetLastPathSegment get
Review Comment:
Remove this verbose comment? And is it necessary to be public?
Remember to add a unit test for it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]