OleksiienkoMykyta commented on code in PR #1664:
URL: 
https://github.com/apache/cassandra-gocql-driver/pull/1664#discussion_r1987457168


##########
session.go:
##########
@@ -553,6 +562,12 @@ func (s *Session) KeyspaceMetadata(keyspace string) 
(*KeyspaceMetadata, error) {
        return s.schemaDescriber.getSchema(keyspace)
 }
 
+// ClusterMetadata returns the cluster metadata.
+// The returned value is a snapshot of the metadata at the time of the call. 
Do not store for later reuse.

Review Comment:
   We should provide more details to the user here, it's a potential weak spot. 
Something like:
   ``` 
   // ClusterMetadata returns a snapshot of the current cluster metadata at the 
time of the call.
   // The metadata may change over time, so storing the returned pointer for 
later reuse can lead to inconsistencies.
   // Potential Data Race Risk:
   // If metaMngr is modified by another goroutine while being read, a data 
race may occur.
   ```



##########
cluster_metadata.go:
##########
@@ -0,0 +1,180 @@
+package gocql
+
+import (
+       "sync"
+       "sync/atomic"
+)
+
+// ClusterMetadata holds metadata about cluster topology.
+// It is used inside atomic.Value and shallow copies are used when replacing 
it,
+// so fields should not be modified in-place. Instead, to modify a field a 
copy of the field should be made
+// and the pointer in ClusterMetadata updated to point to the new value.
+type ClusterMetadata struct {
+       // replicas is map[keyspace]map[Token]hosts
+       replicas  map[string]tokenRingReplicas
+       tokenRing *TokenRing
+}
+
+// TokenRing returns the token ring.
+// Please note that the token ring is only available if at least one cluster 
node is known and up.
+// Several [ClusterConfig] parameters can affect the availability or 
reliability of the token ring:
+// * DisableInitialHostLookup will disable host discovery and therefore the 
token ring availability.
+// * Events.DisableNodeStatusEvents will turn off processing of STATUS_CHANGE 
events,
+//   therefore the token ring will not be updated in response to host UP/DOWN 
events.
+// * Events.DisableTopologyEvents will turn off processing of TOPOLOGY_CHANGE 
events,
+//      therefore the token ring will not be updated in response to cluster 
topology changes.
+func (m *ClusterMetadata) TokenRing() *TokenRing {
+       return m.tokenRing
+}
+
+// resetTokenRing creates a new TokenRing.
+// It must be called with t.mu locked.
+func (m *ClusterMetadata) resetTokenRing(partitioner string, hosts 
[]*HostInfo, logger StdLogger) {
+       if partitioner == "" {
+               // partitioner not yet set
+               return
+       }
+
+       // create a new Token ring
+       tokenRing, err := newTokenRing(partitioner, hosts)
+       if err != nil {
+               logger.Printf("Unable to update the token ring due to error: 
%s", err)
+               return
+       }
+
+       // replace the Token ring
+       m.tokenRing = tokenRing
+}
+
+// clusterMetadataManager manages cluster metadata.
+type clusterMetadataManager struct {
+       getKeyspaceMetadata func(keyspace string) (*KeyspaceMetadata, error)
+       getKeyspaceName     func() string
+
+       // mu protects writes to hosts, partitioner, metadata.
+       // reads can be unlocked as long as they are not used for updating 
state later.
+       mu          sync.Mutex
+       hosts       cowHostList
+       partitioner string
+       metadata    atomic.Value // *ClusterMetadata
+
+       logger StdLogger
+}
+
+func (m *clusterMetadataManager) init(s *Session) {
+       m.mu.Lock()
+       defer m.mu.Unlock()
+       if m.getKeyspaceMetadata != nil {
+               // Init was already called.
+               // See https://github.com/scylladb/gocql/issues/94.
+               panic("sharing token aware host selection policy between 
sessions is not supported")
+       }
+       m.getKeyspaceMetadata = s.KeyspaceMetadata
+       m.getKeyspaceName = func() string { return s.cfg.Keyspace }
+       m.logger = s.logger
+}
+
+func (m *clusterMetadataManager) keyspaceChanged(update KeyspaceUpdateEvent) {
+       m.mu.Lock()
+       defer m.mu.Unlock()
+       meta := m.getMetadataForUpdate()
+       m.updateReplicas(meta, update.Keyspace)
+       m.metadata.Store(meta)
+}
+
+func (m *clusterMetadataManager) setPartitioner(partitioner string) {
+       m.mu.Lock()
+       defer m.mu.Unlock()
+
+       if m.partitioner != partitioner {
+               m.partitioner = partitioner
+               meta := m.getMetadataForUpdate()
+               meta.resetTokenRing(m.partitioner, m.hosts.get(), m.logger)
+               m.updateReplicas(meta, m.getKeyspaceName())
+               m.metadata.Store(meta)
+       }
+}
+
+func (m *clusterMetadataManager) addHost(host *HostInfo) {
+       m.mu.Lock()
+       defer m.mu.Unlock()
+
+       if m.hosts.add(host) {
+               meta := m.getMetadataForUpdate()
+               meta.resetTokenRing(m.partitioner, m.hosts.get(), m.logger)
+               m.updateReplicas(meta, m.getKeyspaceName())
+               m.metadata.Store(meta)
+       }
+}
+
+func (m *clusterMetadataManager) addHosts(hosts []*HostInfo) {
+       m.mu.Lock()
+       defer m.mu.Unlock()
+
+       for _, host := range hosts {
+               m.hosts.add(host)
+       }
+
+       meta := m.getMetadataForUpdate()
+       meta.resetTokenRing(m.partitioner, m.hosts.get(), m.logger)
+       m.updateReplicas(meta, m.getKeyspaceName())
+       m.metadata.Store(meta)
+}
+
+func (m *clusterMetadataManager) removeHost(host *HostInfo) {

Review Comment:
   To be more consistent and help future developers, it would be nice to have 
all methods/functions commented on.



##########
token.go:
##########
@@ -202,7 +217,99 @@ func (t *tokenRing) String() string {
        return string(buf.Bytes())
 }
 
-func (t *tokenRing) GetHostForToken(token token) (host *HostInfo, endToken 
token) {
+// Tokens returns the token range corresponding to the primary replica.
+// The elements are sorted by token ascending.
+// The range for a given item starts after preceding range and ends with the 
token at the current position.
+// The end token is part of the range.
+// The lowest (i.e. index 0) range wraps around the ring (its preceding range 
is the one with the largest index).
+// You can obtain the owner host/vnode of the range by calling HostForToken 
with the end token.
+//
+// The following example constructs one TOKEN-based query for each token range:
+//     func buildTokenQueries(s *Session, t *TokenRing) []*Query {

Review Comment:
   As I understand, you just added the buildTokenQueries function to 
demonstrate how to use token-based queries. I think that this commented example 
would be enough, but the unexportable function (line 276) ```func 
buildTokenQueries(s *Session, t *TokenRing) []*Query {```  should be deleted, 
or used for testing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to