joao-r-reis commented on code in PR #1926:
URL: 
https://github.com/apache/cassandra-gocql-driver/pull/1926#discussion_r2834294680


##########
policies.go:
##########
@@ -612,7 +636,13 @@ func (t *tokenAwareHostPolicy) Pick(qry 
ExecutableStatement) NextHost {
        }
 
        token := meta.tokenRing.partitioner.Hash(routingKey)
-       ht := meta.replicas[qry.Keyspace()].replicasFor(token)
+       var ht *hostTokens
+       if ksMeta, ok := t.getSchemaMeta().keyspaceMeta[qry.Keyspace()]; ok {
+               strategy := ksMeta.placementStrategy
+               if strategy != nil {
+                       ht = 
meta.replicas[strategy.strategyKey()].replicasFor(token)
+               }

Review Comment:
   perfect 👌 



##########
metadata.go:
##########
@@ -243,95 +245,226 @@ const (
 
 // queries the cluster for schema information for a specific keyspace
 type schemaDescriber struct {
-       session *Session
-       mu      sync.Mutex
+       session         *Session
+       schemaRefresher *refreshDebouncer
+       schemaMeta      atomic.Value // *schemaMeta
+}
 
-       cache map[string]*KeyspaceMetadata
+// Constants for keyspace change events
+const (
+       KeyspaceCreated = "CREATED"
+       KeyspaceUpdated = "UPDATED"
+       KeyspaceDropped = "DROPPED"
+)
+
+type schemaMeta struct {
+       keyspaceMeta map[string]*KeyspaceMetadata
 }
 
 // creates a session bound schema describer which will query and cache
 // keyspace metadata
-func newSchemaDescriber(session *Session) *schemaDescriber {
-       return &schemaDescriber{
+func newSchemaDescriber(session *Session, schemaRefresher *refreshDebouncer) 
*schemaDescriber {
+       meta := new(schemaMeta)
+       describer := &schemaDescriber{
                session: session,
-               cache:   map[string]*KeyspaceMetadata{},
        }
+       describer.schemaMeta.Store(meta)
+       describer.schemaRefresher = schemaRefresher
+       return describer
+}
+
+func (s *schemaDescriber) getSchemaMetaForRead() *schemaMeta {
+       meta, _ := s.schemaMeta.Load().(*schemaMeta)
+       return meta
+}
+
+func (s *schemaDescriber) getSchemaMetaForUpdate() *schemaMeta {
+       meta := s.getSchemaMetaForRead()
+       metaNew := new(schemaMeta)
+       if meta != nil {
+               *metaNew = *meta
+       }
+       return metaNew
 }
 
 // returns the cached KeyspaceMetadata held by the describer for the named
 // keyspace.
 func (s *schemaDescriber) getSchema(keyspaceName string) (*KeyspaceMetadata, 
error) {
-       s.mu.Lock()
-       defer s.mu.Unlock()
+       if s.session.cfg.MetadataCacheMode == Disabled {
+               return s.fetchSchema(keyspaceName)
+       }
+       metadata, found := s.getSchemaMetaForRead().keyspaceMeta[keyspaceName]
 
-       metadata, found := s.cache[keyspaceName]
        if !found {
-               // refresh the cache for this keyspace
-               err := s.refreshSchema(keyspaceName)
-               if err != nil {
-                       return nil, err
-               }
-
-               metadata = s.cache[keyspaceName]
+               return nil, ErrKeyspaceDoesNotExist
        }
 
        return metadata, nil
 }
 
-// clears the already cached keyspace metadata
-func (s *schemaDescriber) clearSchema(keyspaceName string) {
-       s.mu.Lock()
-       defer s.mu.Unlock()
-
-       delete(s.cache, keyspaceName)
-}
-
-// forcibly updates the current KeyspaceMetadata held by the schema describer
-// for a given named keyspace.
-func (s *schemaDescriber) refreshSchema(keyspaceName string) error {
+func (s *schemaDescriber) fetchSchema(keyspaceName string) (*KeyspaceMetadata, 
error) {
        var err error
 
        // query the system keyspace for schema data
        // TODO retrieve concurrently
        keyspace, err := getKeyspaceMetadata(s.session, keyspaceName)
        if err != nil {
-               return err
+               return nil, err
        }
        tables, err := getTableMetadata(s.session, keyspaceName)
        if err != nil {
-               return err
+               return nil, err
        }
        columns, err := getColumnMetadata(s.session, keyspaceName)
        if err != nil {
-               return err
+               return nil, err
        }
        functions, err := getFunctionsMetadata(s.session, keyspaceName)
        if err != nil {
-               return err
+               return nil, err
        }
        aggregates, err := getAggregatesMetadata(s.session, keyspaceName)
        if err != nil {
-               return err
+               return nil, err
        }
        userTypes, err := getUserTypeMetadata(s.session, keyspaceName)
        if err != nil {
-               return err
+               return nil, err
        }
        materializedViews, err := getMaterializedViewsMetadata(s.session, 
keyspaceName)
        if err != nil {
-               return err
+               return nil, err
        }
 
        // organize the schema data
        compileMetadata(s.session, keyspace, tables, columns, functions, 
aggregates, userTypes,
                materializedViews)
 
-       // update the cache
-       s.cache[keyspaceName] = keyspace
+       return keyspace, nil
+}
 
+// forcibly updates the current KeyspaceMetadata held by the schema describer
+// for all the keyspaces.
+// This function is called via schemaRefresher refreshDebouncer to batch and
+// debounce schema refresh requests.
+func refreshSchemas(session *Session) error {
+       if session.cfg.MetadataCacheMode == Disabled {
+               return nil
+       }
+       session.control.awaitSchemaAgreement()
+       var err error
+       var keyspaceMeta map[string]*KeyspaceMetadata
+       // query the system keyspace for schema data
+       keyspaces, err := getAllKeyspaceMetadata(session)
+       if err != nil {
+               return err
+       }
+       var tables map[string][]TableMetadata
+       var columns map[string][]ColumnMetadata
+       var functions map[string][]FunctionMetadata
+       var aggregates map[string][]AggregateMetadata
+       var userTypes map[string][]UserTypeMetadata
+       var materializedViews map[string][]MaterializedViewMetadata
+       if session.cfg.MetadataCacheMode == Full {
+               tables, err = getAllTablesMetadata(session)
+               if err != nil {
+                       return err
+               }
+               columns, err = getAllColumnMetadata(session)
+               if err != nil {
+                       return err
+               }
+               functions, err = getAllFunctionsMetadata(session)
+               if err != nil {
+                       return err
+               }
+               aggregates, err = getAllAggregatesMetadata(session)
+               if err != nil {
+                       return err
+               }
+               userTypes, err = getAllUserTypeMetadata(session)
+               if err != nil {
+                       return err
+               }
+               materializedViews, err = 
getAllMaterializedViewsMetadata(session)
+               if err != nil {
+                       return err
+               }
+       }
+
+       // organize the schema data
+       keyspaceMeta = make(map[string]*KeyspaceMetadata)
+       sd := session.schemaDescriber
+       meta := sd.getSchemaMetaForUpdate()
+       oldKeyspaceMeta := meta.keyspaceMeta
+       var newKeyspaces []string
+       var updatedKeyspaces []string
+       var droppedKeyspaces []string
+       for keyspaceName, keyspace := range keyspaces {
+               compileMetadata(session,
+                       keyspace,
+                       tables[keyspaceName],
+                       columns[keyspaceName],
+                       functions[keyspaceName],
+                       aggregates[keyspaceName],
+                       userTypes[keyspaceName],
+                       materializedViews[keyspaceName])
+               // update the cache
+               keyspaceMeta[keyspaceName] = keyspace
+               if _, ok := oldKeyspaceMeta[keyspaceName]; !ok {
+                       newKeyspaces = append(newKeyspaces, keyspaceName)
+               } else {
+                       newStrat := keyspace.placementStrategy
+                       oldStrat := 
oldKeyspaceMeta[keyspaceName].placementStrategy
+
+                       if (newStrat == nil) != (oldStrat == nil) {
+                               updatedKeyspaces = append(updatedKeyspaces, 
keyspaceName)
+                       } else if newStrat != nil && newStrat.strategyKey() != 
oldStrat.strategyKey() {
+                               updatedKeyspaces = append(updatedKeyspaces, 
keyspaceName)
+                       }
+               }
+       }
+       droppedKeyspaces = sd.getDroppedKeyspaces(oldKeyspaceMeta, keyspaces)
+       meta.keyspaceMeta = keyspaceMeta
+       sd.schemaMeta.Store(meta)
+
+       for _, createdKeyspace := range newKeyspaces {
+               session.policy.KeyspaceChanged(KeyspaceUpdateEvent{Keyspace: 
createdKeyspace, Change: KeyspaceCreated})
+       }
+       for _, droppedKeyspace := range droppedKeyspaces {
+               session.policy.KeyspaceChanged(KeyspaceUpdateEvent{Keyspace: 
droppedKeyspace, Change: KeyspaceDropped})
+       }
+       for _, updatedKeyspace := range updatedKeyspaces {
+               session.policy.KeyspaceChanged(KeyspaceUpdateEvent{Keyspace: 
updatedKeyspace, Change: KeyspaceUpdated})
+       }

Review Comment:
   I would type check the policy to see if it implements the notifier interface 
and if it does it only calls the notifier method. If it doesn't then call the 
regular `KeyspaceChanged` method like it's doing here. 
   
   If the policy implements notifier technically all we care about is to check 
if there was any kind of change just so we know that we need to call the notify 
method but this computation of what actually changed (created,dropped,updated) 
is necessary to maintain compatibility with the old interface AND to later 
implement proper schema events on the public API which is being worked on in 
parallel on another PR (I've asked the author of that PR to wait until your 
implementation here is merged)



##########
cluster.go:
##########
@@ -286,6 +294,8 @@ type ClusterConfig struct {
 
        // internal config for testing
        disableControlConn bool
+
+       MetadataCacheMode MetadataCacheMode

Review Comment:
   add `//` api docs explaining what each mode does.
   
   ```
   // MetadataCacheMode specifies whether the driver should read and parse the 
system tables to store schema metadata information. The behavior of 
Session.KeyspaceMetadata and tokenAwareHostPolicy will depend on this mode.
   //
   // Full is the default and specifies that the driver will read all the 
system tables and store all the schema metadata related to a keyspace including 
tables, functions, aggregates, UDTs and materialized views. In this mode token 
aware policies will work normally (note that token aware policy is not the 
default HostSelectionPolicy). Session.KeyspaceMetadata will only look up the 
metadata cache, it will not issue any system table query.
   //
   // KeyspaceOnly is a mode that allows token aware policies to work normally 
but prevents the driver from querying and storing all the other metadata that 
is mentioned in the Full mode's description above. Session.KeyspaceMetadata 
will continue to only look up the metadata cache but the returned object will 
have some of its fields set as `nil` (`Tables`, `Functions`, `Aggregates`, 
`MaterializedViews`, `UserTypes`).
   //
   // Disabled will completely disable this feature. Token aware policies will 
fall back to round robin and Session.KeyspaceMetadata will change its behavior 
to always query the system table (caching is completely disabled).
   ```
   
   Maybe we move the specific docs for each mode to the actual constants and 
reference them in the docs for this config field.



##########
CHANGELOG.md:
##########


Review Comment:
   can you add https://issues.apache.org/jira/browse/CASSGO-107 here as well? I 
just created it I think it makes sense to document this new metadata cache 
system as a separate ticket. I updated the title of the PR as well.



##########
topology.go:
##########
@@ -163,6 +179,33 @@ type networkTopology struct {
        dcs map[string]int
 }
 
+func (n *networkTopology) strategyClass() string {
+       return networkTopologyStrategyClass
+}
+
+func (n *networkTopology) strategyKey() string {

Review Comment:
   Let's put the computation of the key at init time of the strategy so it 
happens only once and this method only returns the string. This method will be 
called multiple times and it will be called once per query at least (on the 
`Pick` method.



##########
topology.go:
##########
@@ -163,6 +179,33 @@ type networkTopology struct {
        dcs map[string]int
 }
 
+func (n *networkTopology) strategyClass() string {
+       return networkTopologyStrategyClass
+}
+
+func (n *networkTopology) strategyKey() string {
+       // Sort datacenter names for deterministic output
+       dcs := make([]string, 0, len(n.dcs))
+       for dc := range n.dcs {
+               dcs = append(dcs, dc)
+       }
+       sort.Strings(dcs)
+
+       // Build key with sorted dc=rf pairs using strings.Builder for 
efficiency
+       var b strings.Builder
+       b.WriteString(n.strategyClass())

Review Comment:
   let's call `b.Grow(64)` right after creating the builder to pre-allocate 64 
bytes, it should reduce the number of allocations quite a bit without much 
effort.



##########
policies.go:
##########
@@ -455,35 +457,57 @@ func (t *tokenAwareHostPolicy) IsLocal(host *HostInfo) 
bool {
 }
 
 func (t *tokenAwareHostPolicy) KeyspaceChanged(update KeyspaceUpdateEvent) {
-       t.mu.Lock()
-       defer t.mu.Unlock()
-       meta := t.getMetadataForUpdate()
-       t.updateReplicas(meta, update.Keyspace)
-       t.metadata.Store(meta)
+       if update.Change != KeyspaceDropped {
+               t.updateReplicas(update.Keyspace)
+       }
 }
 
-// updateReplicas updates replicas in clusterMeta.
-// It must be called with t.mu mutex locked.
+// updateReplicas updates replicas in clusterMeta for keyspace schema changes.
 // meta must not be nil and it's replicas field will be updated.
-func (t *tokenAwareHostPolicy) updateReplicas(meta *clusterMeta, keyspace 
string) {
-       newReplicas := make(map[string]tokenRingReplicas, len(meta.replicas))
-
+func (t *tokenAwareHostPolicy) updateReplicas(keyspace string) {

Review Comment:
   let's remove this method and make `KeyspaceChanged(update 
KeyspaceUpdateEvent)` NOOP on this policy, basically bringing back the internal 
notifier interface that you had before. 
   
   This `updateReplicas` is basically the implementation that an external 
policy will have to implement if they want to replicate what our token aware 
policy is doing but we can cheat a little bit and make it simpler (and more 
performant in init) for our policy by just doing a full update everytime so the 
`updateAllReplicas` method that you added is enough.



##########
topology.go:
##########
@@ -127,10 +130,23 @@ func getStrategy(ks *KeyspaceMetadata, logger 
StructuredLogger) placementStrateg
        }
 }
 
+const (
+       simpleStrategyClass          string = "SimpleStrategy"
+       networkTopologyStrategyClass string = "NetworkTopologyStrategy"
+)
+
 type simpleStrategy struct {
        rf int
 }
 
+func (s *simpleStrategy) strategyClass() string {
+       return simpleStrategyClass
+}
+
+func (s *simpleStrategy) strategyKey() string {
+       return fmt.Sprintf("%s:rf=%d", s.strategyClass(), s.rf)

Review Comment:
   This looks good, I would only move the computation like I mentioned on my 
comment for the NT strategy.



##########
metadata.go:
##########
@@ -243,95 +245,226 @@ const (
 
 // queries the cluster for schema information for a specific keyspace
 type schemaDescriber struct {
-       session *Session
-       mu      sync.Mutex
+       session         *Session
+       schemaRefresher *refreshDebouncer
+       schemaMeta      atomic.Value // *schemaMeta
+}
 
-       cache map[string]*KeyspaceMetadata
+// Constants for keyspace change events
+const (
+       KeyspaceCreated = "CREATED"
+       KeyspaceUpdated = "UPDATED"
+       KeyspaceDropped = "DROPPED"

Review Comment:
   I'd probably rename these 3 constants slightly because they are on the main 
`gocql` package so there's no context clues to indicate that these refer to the 
[change type of the schema 
events](https://github.com/apache/cassandra/blob/trunk/doc/native_protocol_v5.spec#L997-L998).
 Something like `SchemaChangeTypeCreated`, maybe?



##########
session.go:
##########
@@ -367,8 +367,12 @@ func (s *Session) init() error {
 
        // Invoke KeyspaceChanged to let the policy cache the session keyspace
        // parameters. This is used by tokenAwareHostPolicy to discover 
replicas.
-       if !s.cfg.disableControlConn && s.cfg.Keyspace != "" {
-               s.policy.KeyspaceChanged(KeyspaceUpdateEvent{Keyspace: 
s.cfg.Keyspace})
+       if !s.cfg.disableControlConn && s.schemaDescriber != nil {
+               err := s.schemaDescriber.refreshSchemaMetadata()
+               if err != nil {
+                       s.logger.Warning("Unable to refresh schema metadata.",

Review Comment:
   Add some info to this log message: `Unable to initialize schema metadata. 
Token aware host selection policies might fall back to round robin and attempts 
to retrieve keyspace metadata might fail with ErrKeyspaceDoesNotExist.`



##########
metadata.go:
##########
@@ -327,11 +344,99 @@ func (s *schemaDescriber) refreshSchema(keyspaceName 
string) error {
                materializedViews)
 
        // update the cache
-       s.cache[keyspaceName] = keyspace
+       s.mu.Lock()
+       defer s.mu.Unlock()
+       meta := s.getSchemaMetaForUpdate()
+       newKeyspaceMeta := make(map[string]*KeyspaceMetadata)
+       for ks, meta := range meta.keyspaceMeta {
+               newKeyspaceMeta[ks] = meta
+       }
+       newKeyspaceMeta[keyspaceName] = keyspace
+
+       meta.keyspaceMeta = newKeyspaceMeta
+       s.schemaMeta.Store(meta)
+
+       return nil
+}
+
+// forcibly updates the current KeyspaceMetadata held by the schema describer
+// for all the keyspaces.
+// This function is called via schemaRefresher refreshDebouncer to batch and
+// debounce schema refresh requests.
+func refreshSchemas(session *Session) error {
+       var err error
+
+       // query the system keyspace for schema data
+       keyspaces, err := getAllKeyspaceMetadata(session)
+       if err != nil {
+               return err
+       }
+       tables, err := getAllTablesMetadata(session)
+       if err != nil {
+               return err
+       }
+       columns, err := getAllColumnMetadata(session)
+       if err != nil {
+               return err
+       }
+       functions, err := getAllFunctionsMetadata(session)
+       if err != nil {
+               return err
+       }
+       aggregates, err := getAllAggregatesMetadata(session)
+       if err != nil {
+               return err
+       }
+       userTypes, err := getAllUserTypeMetadata(session)
+       if err != nil {
+               return err
+       }
+       materializedViews, err := getAllMaterializedViewsMetadata(session)
+       if err != nil {
+               return err
+       }
+
+       // organize the schema data
+       keyspaceMeta := map[string]*KeyspaceMetadata{}
+       for keyspaceName, keyspace := range keyspaces {
+               compileMetadata(session,
+                       keyspace,
+                       tables[keyspaceName],
+                       columns[keyspaceName],
+                       functions[keyspaceName],
+                       aggregates[keyspaceName],
+                       userTypes[keyspaceName],
+                       materializedViews[keyspaceName])
+               // update the cache
+               keyspaceMeta[keyspaceName] = keyspace
+       }
+       sd := session.schemaDescriber
+       sd.mu.Lock()
+       meta := sd.getSchemaMetaForUpdate()
+       meta.keyspaceMeta = keyspaceMeta
+       sd.schemaMeta.Store(meta)
+       sd.mu.Unlock()
+       // Notify policy if it supports schema refresh notifications
+       if notifier, ok := session.policy.(SchemaRefreshNotifier); ok {
+               notifier.SchemaRefreshed(sd.getSchemaMetaForRead())
+       }
 
        return nil
 }

Review Comment:
   This is still missing I believe



##########
topology.go:
##########
@@ -163,6 +179,33 @@ type networkTopology struct {
        dcs map[string]int
 }
 
+func (n *networkTopology) strategyClass() string {
+       return networkTopologyStrategyClass
+}
+
+func (n *networkTopology) strategyKey() string {
+       // Sort datacenter names for deterministic output
+       dcs := make([]string, 0, len(n.dcs))
+       for dc := range n.dcs {
+               dcs = append(dcs, dc)
+       }
+       sort.Strings(dcs)
+
+       // Build key with sorted dc=rf pairs using strings.Builder for 
efficiency
+       var b strings.Builder
+       b.WriteString(n.strategyClass())
+       b.WriteString(":")
+       for i, dc := range dcs {
+               if i > 0 {
+                       b.WriteString(",")

Review Comment:
   I'm a bit concerned about the use of `,` and `=` here because I'm not 
actually sure if these characters are forbidden in datacenter names so it could 
potentially lead to "collisions" between different strategies.
   
   I would prepend the length of the datacenter name and use `:` and `|` as 
delimiters:
   
   `{Strategy}|{LenDC}:{DC}:{RF}|...`
   example: `NetworkTopologyStrategy|3:dc1:3|3:dc2:1`
   
   I think this will prevent issues where the dc name is weird for example 
let's say there is only one dc with name `dc1:3|3:dc2`, the prefix length would 
make it different from the above case `NetworkTopologyStrategy|11:dc1:3|3:dc2:1`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to