raj14243 commented on code in PR #1926:
URL: 
https://github.com/apache/cassandra-gocql-driver/pull/1926#discussion_r2835187168


##########
metadata.go:
##########
@@ -243,95 +245,226 @@ const (
 
 // queries the cluster for schema information for a specific keyspace
 type schemaDescriber struct {
-       session *Session
-       mu      sync.Mutex
+       session         *Session
+       schemaRefresher *refreshDebouncer
+       schemaMeta      atomic.Value // *schemaMeta
+}
 
-       cache map[string]*KeyspaceMetadata
+// Constants for keyspace change events
+const (
+       KeyspaceCreated = "CREATED"
+       KeyspaceUpdated = "UPDATED"
+       KeyspaceDropped = "DROPPED"
+)
+
+type schemaMeta struct {
+       keyspaceMeta map[string]*KeyspaceMetadata
 }
 
 // creates a session bound schema describer which will query and cache
 // keyspace metadata
-func newSchemaDescriber(session *Session) *schemaDescriber {
-       return &schemaDescriber{
+func newSchemaDescriber(session *Session, schemaRefresher *refreshDebouncer) 
*schemaDescriber {
+       meta := new(schemaMeta)
+       describer := &schemaDescriber{
                session: session,
-               cache:   map[string]*KeyspaceMetadata{},
        }
+       describer.schemaMeta.Store(meta)
+       describer.schemaRefresher = schemaRefresher
+       return describer
+}
+
+func (s *schemaDescriber) getSchemaMetaForRead() *schemaMeta {
+       meta, _ := s.schemaMeta.Load().(*schemaMeta)
+       return meta
+}
+
+func (s *schemaDescriber) getSchemaMetaForUpdate() *schemaMeta {
+       meta := s.getSchemaMetaForRead()
+       metaNew := new(schemaMeta)
+       if meta != nil {
+               *metaNew = *meta
+       }
+       return metaNew
 }
 
 // returns the cached KeyspaceMetadata held by the describer for the named
 // keyspace.
 func (s *schemaDescriber) getSchema(keyspaceName string) (*KeyspaceMetadata, 
error) {
-       s.mu.Lock()
-       defer s.mu.Unlock()
+       if s.session.cfg.MetadataCacheMode == Disabled {
+               return s.fetchSchema(keyspaceName)
+       }
+       metadata, found := s.getSchemaMetaForRead().keyspaceMeta[keyspaceName]
 
-       metadata, found := s.cache[keyspaceName]
        if !found {
-               // refresh the cache for this keyspace
-               err := s.refreshSchema(keyspaceName)
-               if err != nil {
-                       return nil, err
-               }
-
-               metadata = s.cache[keyspaceName]
+               return nil, ErrKeyspaceDoesNotExist
        }
 
        return metadata, nil
 }
 
-// clears the already cached keyspace metadata
-func (s *schemaDescriber) clearSchema(keyspaceName string) {
-       s.mu.Lock()
-       defer s.mu.Unlock()
-
-       delete(s.cache, keyspaceName)
-}
-
-// forcibly updates the current KeyspaceMetadata held by the schema describer
-// for a given named keyspace.
-func (s *schemaDescriber) refreshSchema(keyspaceName string) error {
+func (s *schemaDescriber) fetchSchema(keyspaceName string) (*KeyspaceMetadata, 
error) {
        var err error
 
        // query the system keyspace for schema data
        // TODO retrieve concurrently
        keyspace, err := getKeyspaceMetadata(s.session, keyspaceName)
        if err != nil {
-               return err
+               return nil, err
        }
        tables, err := getTableMetadata(s.session, keyspaceName)
        if err != nil {
-               return err
+               return nil, err
        }
        columns, err := getColumnMetadata(s.session, keyspaceName)
        if err != nil {
-               return err
+               return nil, err
        }
        functions, err := getFunctionsMetadata(s.session, keyspaceName)
        if err != nil {
-               return err
+               return nil, err
        }
        aggregates, err := getAggregatesMetadata(s.session, keyspaceName)
        if err != nil {
-               return err
+               return nil, err
        }
        userTypes, err := getUserTypeMetadata(s.session, keyspaceName)
        if err != nil {
-               return err
+               return nil, err
        }
        materializedViews, err := getMaterializedViewsMetadata(s.session, 
keyspaceName)
        if err != nil {
-               return err
+               return nil, err
        }
 
        // organize the schema data
        compileMetadata(s.session, keyspace, tables, columns, functions, 
aggregates, userTypes,
                materializedViews)
 
-       // update the cache
-       s.cache[keyspaceName] = keyspace
+       return keyspace, nil
+}
 
+// forcibly updates the current KeyspaceMetadata held by the schema describer
+// for all the keyspaces.
+// This function is called via schemaRefresher refreshDebouncer to batch and
+// debounce schema refresh requests.
+func refreshSchemas(session *Session) error {
+       if session.cfg.MetadataCacheMode == Disabled {
+               return nil
+       }
+       session.control.awaitSchemaAgreement()
+       var err error
+       var keyspaceMeta map[string]*KeyspaceMetadata
+       // query the system keyspace for schema data
+       keyspaces, err := getAllKeyspaceMetadata(session)
+       if err != nil {
+               return err
+       }
+       var tables map[string][]TableMetadata
+       var columns map[string][]ColumnMetadata
+       var functions map[string][]FunctionMetadata
+       var aggregates map[string][]AggregateMetadata
+       var userTypes map[string][]UserTypeMetadata
+       var materializedViews map[string][]MaterializedViewMetadata
+       if session.cfg.MetadataCacheMode == Full {
+               tables, err = getAllTablesMetadata(session)
+               if err != nil {
+                       return err
+               }
+               columns, err = getAllColumnMetadata(session)
+               if err != nil {
+                       return err
+               }
+               functions, err = getAllFunctionsMetadata(session)
+               if err != nil {
+                       return err
+               }
+               aggregates, err = getAllAggregatesMetadata(session)
+               if err != nil {
+                       return err
+               }
+               userTypes, err = getAllUserTypeMetadata(session)
+               if err != nil {
+                       return err
+               }
+               materializedViews, err = 
getAllMaterializedViewsMetadata(session)
+               if err != nil {
+                       return err
+               }
+       }
+
+       // organize the schema data
+       keyspaceMeta = make(map[string]*KeyspaceMetadata)
+       sd := session.schemaDescriber
+       meta := sd.getSchemaMetaForUpdate()
+       oldKeyspaceMeta := meta.keyspaceMeta
+       var newKeyspaces []string
+       var updatedKeyspaces []string
+       var droppedKeyspaces []string
+       for keyspaceName, keyspace := range keyspaces {
+               compileMetadata(session,
+                       keyspace,
+                       tables[keyspaceName],
+                       columns[keyspaceName],
+                       functions[keyspaceName],
+                       aggregates[keyspaceName],
+                       userTypes[keyspaceName],
+                       materializedViews[keyspaceName])
+               // update the cache
+               keyspaceMeta[keyspaceName] = keyspace
+               if _, ok := oldKeyspaceMeta[keyspaceName]; !ok {
+                       newKeyspaces = append(newKeyspaces, keyspaceName)
+               } else {
+                       newStrat := keyspace.placementStrategy
+                       oldStrat := 
oldKeyspaceMeta[keyspaceName].placementStrategy
+
+                       if (newStrat == nil) != (oldStrat == nil) {
+                               updatedKeyspaces = append(updatedKeyspaces, 
keyspaceName)
+                       } else if newStrat != nil && newStrat.strategyKey() != 
oldStrat.strategyKey() {
+                               updatedKeyspaces = append(updatedKeyspaces, 
keyspaceName)
+                       }
+               }
+       }
+       droppedKeyspaces = sd.getDroppedKeyspaces(oldKeyspaceMeta, keyspaces)
+       meta.keyspaceMeta = keyspaceMeta
+       sd.schemaMeta.Store(meta)
+
+       for _, createdKeyspace := range newKeyspaces {
+               session.policy.KeyspaceChanged(KeyspaceUpdateEvent{Keyspace: 
createdKeyspace, Change: KeyspaceCreated})
+       }
+       for _, droppedKeyspace := range droppedKeyspaces {
+               session.policy.KeyspaceChanged(KeyspaceUpdateEvent{Keyspace: 
droppedKeyspace, Change: KeyspaceDropped})
+       }
+       for _, updatedKeyspace := range updatedKeyspaces {
+               session.policy.KeyspaceChanged(KeyspaceUpdateEvent{Keyspace: 
updatedKeyspace, Change: KeyspaceUpdated})
+       }

Review Comment:
   i'll bring back the notifier interface



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to