ShiKaiWi commented on code in PR #286:
URL: 
https://github.com/apache/incubator-horaedb-meta/pull/286#discussion_r1461739617


##########
server/cluster/metadata/cluster_metadata.go:
##########
@@ -392,6 +398,31 @@ func (c *ClusterMetadata) CreateTable(ctx context.Context, 
request CreateTableRe
        return ret, nil
 }
 
+func (c *ClusterMetadata) GetAssignTable(ctx context.Context, schemaName 
string, tableName string) (storage.ShardID, bool, error) {

Review Comment:
   Rename it to `GetTableAssignedShard`?



##########
server/coordinator/persist_shard_picker.go:
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package coordinator
+
+import (
+       "context"
+
+       "github.com/apache/incubator-horaedb-meta/server/cluster/metadata"
+       "github.com/apache/incubator-horaedb-meta/server/storage"
+       "github.com/pkg/errors"
+)
+
+type PersistShardPicker struct {
+       cluster  *metadata.ClusterMetadata
+       internal ShardPicker
+}
+
+func NewPersistShardPicker(cluster *metadata.ClusterMetadata, internal 
ShardPicker) *PersistShardPicker {
+       return &PersistShardPicker{cluster: cluster, internal: internal}
+}
+
+func (p *PersistShardPicker) PickShards(ctx context.Context, snapshot 
metadata.Snapshot, schemaName string, tableNames []string) 
(map[string]storage.ShardNode, error) {
+       result := map[string]storage.ShardNode{}
+
+       shardNodeMap := map[storage.ShardID]storage.ShardNode{}
+       for _, shardNode := range snapshot.Topology.ClusterView.ShardNodes {
+               shardNodeMap[shardNode.ID] = shardNode
+       }
+
+       // If table assign has been created, just reuse it.
+       for i := 0; i < len(tableNames); i++ {
+               shardID, exists, err := p.cluster.GetAssignTable(ctx, 
schemaName, tableNames[i])
+               if err != nil {
+                       return map[string]storage.ShardNode{}, err
+               }
+               if exists {
+                       result[tableNames[i]] = shardNodeMap[shardID]
+               }
+       }
+
+       if len(result) == len(tableNames) {
+               return result, nil
+       }
+
+       if len(result) != len(tableNames) && len(result) != 0 {

Review Comment:
   Maybe we should assign the new shards to the unassigned tables rather than 
just returnning an error.



##########
server/cluster/metadata/topology_manager.go:
##########
@@ -44,6 +44,14 @@ type TopologyManager interface {
        AddTable(ctx context.Context, shardID storage.ShardID, latestVersion 
uint64, tables []storage.Table) error
        // RemoveTable remove table on target shards from cluster topology.
        RemoveTable(ctx context.Context, shardID storage.ShardID, latestVersion 
uint64, tableIDs []storage.TableID) error
+       // GetTableShard get the shardID of the shard where the table is 
located.
+       GetTableShardID(ctx context.Context, table storage.Table) 
(storage.ShardID, bool)
+       // AssignTable persistent table shard mapping, it is used to store 
assign results and make the table creation idempotent.
+       AssignTableToShard(ctx context.Context, schemaID storage.SchemaID, 
tableName string, shardID storage.ShardID) error
+       // GetAssignTableResult get table assign result.
+       GetAssignTableShard(ctx context.Context, schemaID storage.SchemaID, 
tableName string) (storage.ShardID, bool)

Review Comment:
   ```suggestion
        GetTableAssignedShard(ctx context.Context, schemaID storage.SchemaID, 
tableName string) (storage.ShardID, bool)
   ```



##########
server/cluster/metadata/cluster_metadata.go:
##########
@@ -392,6 +398,31 @@ func (c *ClusterMetadata) CreateTable(ctx context.Context, 
request CreateTableRe
        return ret, nil
 }
 
+func (c *ClusterMetadata) GetAssignTable(ctx context.Context, schemaName 
string, tableName string) (storage.ShardID, bool, error) {
+       schema, exists := c.tableManager.GetSchema(schemaName)
+       if !exists {
+               return 0, false, errors.WithMessagef(ErrSchemaNotFound, "schema 
%s not found", schemaName)
+       }
+       shardIDs, exists := c.topologyManager.GetAssignTableShard(ctx, 
schema.ID, tableName)
+       return shardIDs, exists, nil
+}
+
+func (c *ClusterMetadata) AssignTable(ctx context.Context, schemaName string, 
tableName string, shardID storage.ShardID) error {
+       schema, exists := c.tableManager.GetSchema(schemaName)
+       if !exists {
+               return errors.WithMessagef(ErrSchemaNotFound, "schema %s not 
found", schemaName)
+       }
+       return c.topologyManager.AssignTableToShard(ctx, schema.ID, tableName, 
shardID)
+}
+
+func (c *ClusterMetadata) DeleteAssignTable(ctx context.Context, schemaName 
string, tableName string) error {

Review Comment:
   ```suggestion
   func (c *ClusterMetadata) DeleteTableAssignedShard(ctx context.Context, 
schemaName string, tableName string) error {
   ```



##########
server/cluster/metadata/cluster_metadata.go:
##########
@@ -392,6 +398,31 @@ func (c *ClusterMetadata) CreateTable(ctx context.Context, 
request CreateTableRe
        return ret, nil
 }
 
+func (c *ClusterMetadata) GetAssignTable(ctx context.Context, schemaName 
string, tableName string) (storage.ShardID, bool, error) {
+       schema, exists := c.tableManager.GetSchema(schemaName)
+       if !exists {
+               return 0, false, errors.WithMessagef(ErrSchemaNotFound, "schema 
%s not found", schemaName)
+       }
+       shardIDs, exists := c.topologyManager.GetAssignTableShard(ctx, 
schema.ID, tableName)
+       return shardIDs, exists, nil
+}
+
+func (c *ClusterMetadata) AssignTable(ctx context.Context, schemaName string, 
tableName string, shardID storage.ShardID) error {

Review Comment:
   ```suggestion
   func (c *ClusterMetadata) AssignTableToShard(ctx context.Context, schemaName 
string, tableName string, shardID storage.ShardID) error {
   ```



##########
server/etcdutil/util.go:
##########
@@ -118,11 +119,40 @@ func Scan(ctx context.Context, client *clientv3.Client, 
startKey, endKey string,
                        }
                }
 
-               // Check whether the keys is exhausted.
+               // Check whether the keys are exhausted.
                if len(resp.Kvs) < batchSize {
                        return nil
                }
 
                lastKeyInPrevBatch = string(resp.Kvs[len(resp.Kvs)-1].Key)
        }
 }
+
+func ScanWithPrefix(ctx context.Context, client *clientv3.Client, prefix 
string, batchSize int, do func(key string, val []byte) error) error {
+       rangeEnd := clientv3.GetPrefixRangeEnd(prefix)
+
+       for {
+               resp, err := client.Get(ctx, prefix, 
clientv3.WithRange(rangeEnd), clientv3.WithLimit(int64(batchSize)))
+               if err != nil {
+                       return ErrEtcdKVGet.WithCause(err)
+               }
+               // Check whether the keys are exhausted.
+               if len(resp.Kvs) == 0 {
+                       return nil
+               }
+
+               for _, item := range resp.Kvs {
+                       err := do(string(item.Key), item.Value)
+                       if err != nil {
+                               return err
+                       }
+               }
+
+               rangeEnd = string(resp.Kvs[len(resp.Kvs)-1].Key)
+       }
+}
+
+// GetLastPathSegment get

Review Comment:
   Remove this verbose comment? And is it necessary to be public?
   
   Remember to add a unit test for it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to