joao-r-reis commented on code in PR #1926:
URL:
https://github.com/apache/cassandra-gocql-driver/pull/1926#discussion_r2827744897
##########
events.go:
##########
@@ -137,16 +137,16 @@ func (s *Session) handleSchemaEvent(frames []frame) {
for _, frame := range frames {
switch f := frame.(type) {
case *schemaChangeKeyspace:
- s.schemaDescriber.clearSchema(f.keyspace)
+ s.schemaDescriber.debounceRefreshSchemaMetadata()
s.handleKeyspaceChange(f.keyspace, f.change)
Review Comment:
Make this a part of the schema refresh call (I go into more detail in my
comment on that function)
##########
metadata.go:
##########
@@ -327,11 +344,99 @@ func (s *schemaDescriber) refreshSchema(keyspaceName
string) error {
materializedViews)
// update the cache
- s.cache[keyspaceName] = keyspace
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ meta := s.getSchemaMetaForUpdate()
+ newKeyspaceMeta := make(map[string]*KeyspaceMetadata)
+ for ks, meta := range meta.keyspaceMeta {
+ newKeyspaceMeta[ks] = meta
+ }
+ newKeyspaceMeta[keyspaceName] = keyspace
+
+ meta.keyspaceMeta = newKeyspaceMeta
+ s.schemaMeta.Store(meta)
+
+ return nil
+}
+
+// forcibly updates the current KeyspaceMetadata held by the schema describer
+// for all the keyspaces.
+// This function is called via schemaRefresher refreshDebouncer to batch and
+// debounce schema refresh requests.
+func refreshSchemas(session *Session) error {
+ var err error
+
+ // query the system keyspace for schema data
+ keyspaces, err := getAllKeyspaceMetadata(session)
+ if err != nil {
+ return err
+ }
+ tables, err := getAllTablesMetadata(session)
+ if err != nil {
+ return err
+ }
+ columns, err := getAllColumnMetadata(session)
+ if err != nil {
+ return err
+ }
+ functions, err := getAllFunctionsMetadata(session)
+ if err != nil {
+ return err
+ }
+ aggregates, err := getAllAggregatesMetadata(session)
+ if err != nil {
+ return err
+ }
+ userTypes, err := getAllUserTypeMetadata(session)
+ if err != nil {
+ return err
+ }
+ materializedViews, err := getAllMaterializedViewsMetadata(session)
+ if err != nil {
+ return err
+ }
+
+ // organize the schema data
+ keyspaceMeta := map[string]*KeyspaceMetadata{}
+ for keyspaceName, keyspace := range keyspaces {
+ compileMetadata(session,
+ keyspace,
+ tables[keyspaceName],
+ columns[keyspaceName],
+ functions[keyspaceName],
+ aggregates[keyspaceName],
+ userTypes[keyspaceName],
+ materializedViews[keyspaceName])
+ // update the cache
+ keyspaceMeta[keyspaceName] = keyspace
+ }
+ sd := session.schemaDescriber
+ sd.mu.Lock()
+ meta := sd.getSchemaMetaForUpdate()
+ meta.keyspaceMeta = keyspaceMeta
+ sd.schemaMeta.Store(meta)
+ sd.mu.Unlock()
+ // Notify policy if it supports schema refresh notifications
+ if notifier, ok := session.policy.(SchemaRefreshNotifier); ok {
+ notifier.SchemaRefreshed(sd.getSchemaMetaForRead())
Review Comment:
we only need to refresh the replicas if keyspace metadata changed, we should
do that computation here to determine whether we need to notify. And we
probably should change the interface name (and method) to something more
specific to just keyspaces not schema in general.
##########
policies.go:
##########
@@ -324,6 +324,10 @@ type HostSelectionPolicy interface {
Pick(statement ExecutableStatement) NextHost
}
+type SchemaRefreshNotifier interface {
Review Comment:
make the interface and the method unexported
##########
metadata.go:
##########
@@ -327,11 +344,99 @@ func (s *schemaDescriber) refreshSchema(keyspaceName
string) error {
materializedViews)
// update the cache
- s.cache[keyspaceName] = keyspace
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ meta := s.getSchemaMetaForUpdate()
+ newKeyspaceMeta := make(map[string]*KeyspaceMetadata)
+ for ks, meta := range meta.keyspaceMeta {
+ newKeyspaceMeta[ks] = meta
+ }
+ newKeyspaceMeta[keyspaceName] = keyspace
+
+ meta.keyspaceMeta = newKeyspaceMeta
+ s.schemaMeta.Store(meta)
+
+ return nil
+}
+
+// forcibly updates the current KeyspaceMetadata held by the schema describer
+// for all the keyspaces.
+// This function is called via schemaRefresher refreshDebouncer to batch and
+// debounce schema refresh requests.
+func refreshSchemas(session *Session) error {
+ var err error
+
+ // query the system keyspace for schema data
Review Comment:
wait for schema agreement here
##########
metadata.go:
##########
@@ -327,11 +344,99 @@ func (s *schemaDescriber) refreshSchema(keyspaceName
string) error {
materializedViews)
// update the cache
- s.cache[keyspaceName] = keyspace
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ meta := s.getSchemaMetaForUpdate()
+ newKeyspaceMeta := make(map[string]*KeyspaceMetadata)
+ for ks, meta := range meta.keyspaceMeta {
+ newKeyspaceMeta[ks] = meta
+ }
+ newKeyspaceMeta[keyspaceName] = keyspace
+
+ meta.keyspaceMeta = newKeyspaceMeta
+ s.schemaMeta.Store(meta)
+
+ return nil
+}
+
+// forcibly updates the current KeyspaceMetadata held by the schema describer
+// for all the keyspaces.
+// This function is called via schemaRefresher refreshDebouncer to batch and
+// debounce schema refresh requests.
+func refreshSchemas(session *Session) error {
+ var err error
+
+ // query the system keyspace for schema data
+ keyspaces, err := getAllKeyspaceMetadata(session)
+ if err != nil {
+ return err
+ }
+ tables, err := getAllTablesMetadata(session)
+ if err != nil {
+ return err
+ }
+ columns, err := getAllColumnMetadata(session)
+ if err != nil {
+ return err
+ }
+ functions, err := getAllFunctionsMetadata(session)
+ if err != nil {
+ return err
+ }
+ aggregates, err := getAllAggregatesMetadata(session)
+ if err != nil {
+ return err
+ }
+ userTypes, err := getAllUserTypeMetadata(session)
+ if err != nil {
+ return err
+ }
+ materializedViews, err := getAllMaterializedViewsMetadata(session)
+ if err != nil {
+ return err
+ }
+
+ // organize the schema data
+ keyspaceMeta := map[string]*KeyspaceMetadata{}
+ for keyspaceName, keyspace := range keyspaces {
+ compileMetadata(session,
+ keyspace,
+ tables[keyspaceName],
+ columns[keyspaceName],
+ functions[keyspaceName],
+ aggregates[keyspaceName],
+ userTypes[keyspaceName],
+ materializedViews[keyspaceName])
+ // update the cache
+ keyspaceMeta[keyspaceName] = keyspace
+ }
+ sd := session.schemaDescriber
+ sd.mu.Lock()
Review Comment:
do we still need the lock if the refresh only happens in the debouncer?
##########
events.go:
##########
Review Comment:
this function is calling the debouncer already
##########
metadata.go:
##########
@@ -327,11 +344,99 @@ func (s *schemaDescriber) refreshSchema(keyspaceName
string) error {
materializedViews)
// update the cache
- s.cache[keyspaceName] = keyspace
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ meta := s.getSchemaMetaForUpdate()
+ newKeyspaceMeta := make(map[string]*KeyspaceMetadata)
+ for ks, meta := range meta.keyspaceMeta {
+ newKeyspaceMeta[ks] = meta
+ }
+ newKeyspaceMeta[keyspaceName] = keyspace
+
+ meta.keyspaceMeta = newKeyspaceMeta
+ s.schemaMeta.Store(meta)
+
+ return nil
+}
+
+// forcibly updates the current KeyspaceMetadata held by the schema describer
+// for all the keyspaces.
+// This function is called via schemaRefresher refreshDebouncer to batch and
+// debounce schema refresh requests.
+func refreshSchemas(session *Session) error {
+ var err error
+
+ // query the system keyspace for schema data
+ keyspaces, err := getAllKeyspaceMetadata(session)
+ if err != nil {
+ return err
+ }
+ tables, err := getAllTablesMetadata(session)
+ if err != nil {
+ return err
+ }
+ columns, err := getAllColumnMetadata(session)
+ if err != nil {
+ return err
+ }
+ functions, err := getAllFunctionsMetadata(session)
+ if err != nil {
+ return err
+ }
+ aggregates, err := getAllAggregatesMetadata(session)
+ if err != nil {
+ return err
+ }
+ userTypes, err := getAllUserTypeMetadata(session)
+ if err != nil {
+ return err
+ }
+ materializedViews, err := getAllMaterializedViewsMetadata(session)
+ if err != nil {
+ return err
+ }
+
+ // organize the schema data
+ keyspaceMeta := map[string]*KeyspaceMetadata{}
+ for keyspaceName, keyspace := range keyspaces {
+ compileMetadata(session,
+ keyspace,
+ tables[keyspaceName],
+ columns[keyspaceName],
+ functions[keyspaceName],
+ aggregates[keyspaceName],
+ userTypes[keyspaceName],
+ materializedViews[keyspaceName])
+ // update the cache
+ keyspaceMeta[keyspaceName] = keyspace
+ }
+ sd := session.schemaDescriber
+ sd.mu.Lock()
+ meta := sd.getSchemaMetaForUpdate()
+ meta.keyspaceMeta = keyspaceMeta
+ sd.schemaMeta.Store(meta)
+ sd.mu.Unlock()
+ // Notify policy if it supports schema refresh notifications
+ if notifier, ok := session.policy.(SchemaRefreshNotifier); ok {
+ notifier.SchemaRefreshed(sd.getSchemaMetaForRead())
+ }
Review Comment:
adding to the previous comment, we should change this to something like:
```
// rough code, just typing it now
notifier, notifierExists := session.policy.(SchemaRefreshNotifier)
keyspacesChanged := false
for keyspaceName, keyspaceMetadata := range newKeyspaceMetadata {
var changeType string
oldKeyspaceMetadata, exists := oldKeyspaces[keyspaceName]
if !exists {
changeType = "CREATED"
keyspacesChanged = true
} else {
// compare keyspace replication
// if keyspace replication is different then changeType =
"UPDATED" and keyspacesChanged = true
}
if !notifierExists && changeType != "" {
session.policy.KeyspaceChanged(KeyspaceUpdateEvent{Keyspace:
keyspaceName, Change:changeType}
}
}
// also need to do the same process for keyspaces that were dropped with
changetype "DROPPED", ideally without having to loop the entire old keyspace
keyset
// in the end call notifier
if keyspacesChanged && notifierExists {
notifier.SchemaRefreshed(sd.getSchemaMetaForRead())
}
```
With this process we can remove `func (s *Session)
handleKeyspaceChange(keyspace, change string)` completely.
I'd probably implement the interface on the other policies that the driver
has so we can avoid the unecessary `session.policy.KeyspaceChanged()` calls
(empty implementations basically)
##########
metadata.go:
##########
@@ -243,51 +245,66 @@ const (
// queries the cluster for schema information for a specific keyspace
type schemaDescriber struct {
- session *Session
- mu sync.Mutex
+ session *Session
+ mu sync.Mutex
+ schemaRefresher *refreshDebouncer
+ schemaMeta atomic.Value // *schemaMeta
+}
- cache map[string]*KeyspaceMetadata
+type schemaMeta struct {
+ keyspaceMeta map[string]*KeyspaceMetadata
}
// creates a session bound schema describer which will query and cache
// keyspace metadata
func newSchemaDescriber(session *Session) *schemaDescriber {
- return &schemaDescriber{
+ meta := new(schemaMeta)
+ describer := &schemaDescriber{
session: session,
- cache: map[string]*KeyspaceMetadata{},
}
Review Comment:
missing initialization of the debouncer
##########
policies.go:
##########
@@ -447,43 +452,39 @@ func (t *tokenAwareHostPolicy) Init(s *Session) {
}
t.getKeyspaceMetadata = s.KeyspaceMetadata
t.getKeyspaceName = func() string { return s.cfg.Keyspace }
+ t.getSchemaMeta = s.schemaDescriber.getSchemaMetaForRead
t.logger = s.logger
}
func (t *tokenAwareHostPolicy) IsLocal(host *HostInfo) bool {
return t.fallback.IsLocal(host)
}
-func (t *tokenAwareHostPolicy) KeyspaceChanged(update KeyspaceUpdateEvent) {
+// replica map is updated through the SchemaRefreshed callback
+func (t *tokenAwareHostPolicy) KeyspaceChanged(update KeyspaceUpdateEvent) {}
+
+func (t *tokenAwareHostPolicy) SchemaRefreshed(schemaMeta *schemaMeta) {
t.mu.Lock()
defer t.mu.Unlock()
meta := t.getMetadataForUpdate()
- t.updateReplicas(meta, update.Keyspace)
+ t.updateReplicas(meta, schemaMeta)
t.metadata.Store(meta)
}
// updateReplicas updates replicas in clusterMeta.
// It must be called with t.mu mutex locked.
// meta must not be nil and it's replicas field will be updated.
-func (t *tokenAwareHostPolicy) updateReplicas(meta *clusterMeta, keyspace
string) {
- newReplicas := make(map[string]tokenRingReplicas, len(meta.replicas))
-
- ks, err := t.getKeyspaceMetadata(keyspace)
- if err == nil {
- strat := getStrategy(ks, t.logger)
+func (t *tokenAwareHostPolicy) updateReplicas(meta *clusterMeta, schemaMeta
*schemaMeta) {
+ schema := schemaMeta.keyspaceMeta
+ newReplicas := make(map[string]tokenRingReplicas, len(schema))
+ for keyspace, metadata := range schema {
+ strat := getStrategy(metadata, t.logger)
if strat != nil {
if meta != nil && meta.tokenRing != nil {
newReplicas[keyspace] =
strat.replicaMap(meta.tokenRing)
Review Comment:
We need to add some kind of caching here to avoid computing the tokenmap
every time for every keyspace. Most of the times a lot of the keyspaces share
the same replication configuration.
The java driver converts the replication configuration to a
`Map<string,string>` and stores this in a `Map<string, Map<string, string>>` so
that it can lookup the replication configuration per keyspace name. Then it
adds all of these replication configuration map objects to a `Set` and then
iterates on this set to compute the replicas and stores the replicas per
replication configuration instead of per keyspace. [Reference
here](https://github.com/apache/cassandra-java-driver/blob/4.x/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/token/DefaultTokenMap.java#L78-L95).
Then on the `getReplicas` method it looks up the replication configuration
by keyspace name and then looks up the replicas by replication configuration.
[Reference
here](https://github.com/apache/cassandra-java-driver/blob/4.x/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/token/DefaultTokenMap.java#L176-L178).
To accomplish this in Go we can use a comparable struct instead of
`Map<String,String>` as a "key" representing the replication configuration
which allows us to basically create a cache of replicas by replication
configuration. Or we can convert the replication configuration to a string but
personally I think that's more error prone.
##########
metadata.go:
##########
@@ -327,11 +344,99 @@ func (s *schemaDescriber) refreshSchema(keyspaceName
string) error {
materializedViews)
// update the cache
- s.cache[keyspaceName] = keyspace
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ meta := s.getSchemaMetaForUpdate()
+ newKeyspaceMeta := make(map[string]*KeyspaceMetadata)
+ for ks, meta := range meta.keyspaceMeta {
+ newKeyspaceMeta[ks] = meta
+ }
+ newKeyspaceMeta[keyspaceName] = keyspace
+
+ meta.keyspaceMeta = newKeyspaceMeta
+ s.schemaMeta.Store(meta)
+
+ return nil
+}
+
+// forcibly updates the current KeyspaceMetadata held by the schema describer
+// for all the keyspaces.
+// This function is called via schemaRefresher refreshDebouncer to batch and
+// debounce schema refresh requests.
+func refreshSchemas(session *Session) error {
+ var err error
+
+ // query the system keyspace for schema data
+ keyspaces, err := getAllKeyspaceMetadata(session)
+ if err != nil {
+ return err
+ }
+ tables, err := getAllTablesMetadata(session)
+ if err != nil {
+ return err
+ }
+ columns, err := getAllColumnMetadata(session)
+ if err != nil {
+ return err
+ }
+ functions, err := getAllFunctionsMetadata(session)
+ if err != nil {
+ return err
+ }
+ aggregates, err := getAllAggregatesMetadata(session)
+ if err != nil {
+ return err
+ }
+ userTypes, err := getAllUserTypeMetadata(session)
+ if err != nil {
+ return err
+ }
+ materializedViews, err := getAllMaterializedViewsMetadata(session)
+ if err != nil {
+ return err
+ }
+
+ // organize the schema data
+ keyspaceMeta := map[string]*KeyspaceMetadata{}
+ for keyspaceName, keyspace := range keyspaces {
+ compileMetadata(session,
+ keyspace,
+ tables[keyspaceName],
+ columns[keyspaceName],
+ functions[keyspaceName],
+ aggregates[keyspaceName],
+ userTypes[keyspaceName],
+ materializedViews[keyspaceName])
+ // update the cache
+ keyspaceMeta[keyspaceName] = keyspace
+ }
+ sd := session.schemaDescriber
+ sd.mu.Lock()
+ meta := sd.getSchemaMetaForUpdate()
+ meta.keyspaceMeta = keyspaceMeta
+ sd.schemaMeta.Store(meta)
+ sd.mu.Unlock()
+ // Notify policy if it supports schema refresh notifications
+ if notifier, ok := session.policy.(SchemaRefreshNotifier); ok {
+ notifier.SchemaRefreshed(sd.getSchemaMetaForRead())
+ }
return nil
}
Review Comment:
log at debug level the amount it took to run the full refresh
##########
metadata.go:
##########
@@ -243,51 +245,66 @@ const (
// queries the cluster for schema information for a specific keyspace
type schemaDescriber struct {
- session *Session
- mu sync.Mutex
+ session *Session
+ mu sync.Mutex
+ schemaRefresher *refreshDebouncer
+ schemaMeta atomic.Value // *schemaMeta
+}
- cache map[string]*KeyspaceMetadata
+type schemaMeta struct {
+ keyspaceMeta map[string]*KeyspaceMetadata
}
// creates a session bound schema describer which will query and cache
// keyspace metadata
func newSchemaDescriber(session *Session) *schemaDescriber {
- return &schemaDescriber{
+ meta := new(schemaMeta)
+ describer := &schemaDescriber{
session: session,
- cache: map[string]*KeyspaceMetadata{},
}
+ describer.schemaMeta.Store(meta)
+ return describer
+}
+
+func (s *schemaDescriber) getSchemaMetaForRead() *schemaMeta {
+ meta, _ := s.schemaMeta.Load().(*schemaMeta)
+ return meta
+}
+
+func (s *schemaDescriber) getSchemaMetaForUpdate() *schemaMeta {
+ meta := s.getSchemaMetaForRead()
+ metaNew := new(schemaMeta)
+ if meta != nil {
+ *metaNew = *meta
+ }
+ return metaNew
}
// returns the cached KeyspaceMetadata held by the describer for the named
// keyspace.
func (s *schemaDescriber) getSchema(keyspaceName string) (*KeyspaceMetadata,
error) {
- s.mu.Lock()
- defer s.mu.Unlock()
-
- metadata, found := s.cache[keyspaceName]
+ metadata, found := s.getSchemaMetaForRead().keyspaceMeta[keyspaceName]
if !found {
- // refresh the cache for this keyspace
- err := s.refreshSchema(keyspaceName)
+ // refresh the cache
+ err := s.refreshSchemaMetadata()
if err != nil {
return nil, err
}
-
- metadata = s.cache[keyspaceName]
+ metadata, found =
s.getSchemaMetaForRead().keyspaceMeta[keyspaceName]
+ if !found {
+ return nil, ErrKeyspaceDoesNotExist
+ }
}
return metadata, nil
}
-// clears the already cached keyspace metadata
-func (s *schemaDescriber) clearSchema(keyspaceName string) {
- s.mu.Lock()
- defer s.mu.Unlock()
-
- delete(s.cache, keyspaceName)
-}
-
// forcibly updates the current KeyspaceMetadata held by the schema describer
// for a given named keyspace.
+//
+// Deprecated: Schema refreshes should be triggered through the schemaRefresher
+// refreshDebouncer (debounceRefreshSchemaMetadata) which batches and debounces
+// refresh requests for all keyspaces via refreshSchemas.
func (s *schemaDescriber) refreshSchema(keyspaceName string) error {
Review Comment:
this isn't exported so it can just be removed (along with the functions that
fetch data by keyspace that become dead code)
##########
metadata.go:
##########
@@ -243,51 +245,66 @@ const (
// queries the cluster for schema information for a specific keyspace
type schemaDescriber struct {
- session *Session
- mu sync.Mutex
+ session *Session
+ mu sync.Mutex
+ schemaRefresher *refreshDebouncer
+ schemaMeta atomic.Value // *schemaMeta
+}
- cache map[string]*KeyspaceMetadata
+type schemaMeta struct {
+ keyspaceMeta map[string]*KeyspaceMetadata
}
// creates a session bound schema describer which will query and cache
// keyspace metadata
func newSchemaDescriber(session *Session) *schemaDescriber {
- return &schemaDescriber{
+ meta := new(schemaMeta)
+ describer := &schemaDescriber{
session: session,
- cache: map[string]*KeyspaceMetadata{},
}
+ describer.schemaMeta.Store(meta)
+ return describer
+}
+
+func (s *schemaDescriber) getSchemaMetaForRead() *schemaMeta {
+ meta, _ := s.schemaMeta.Load().(*schemaMeta)
+ return meta
+}
+
+func (s *schemaDescriber) getSchemaMetaForUpdate() *schemaMeta {
+ meta := s.getSchemaMetaForRead()
+ metaNew := new(schemaMeta)
+ if meta != nil {
+ *metaNew = *meta
+ }
+ return metaNew
}
// returns the cached KeyspaceMetadata held by the describer for the named
// keyspace.
func (s *schemaDescriber) getSchema(keyspaceName string) (*KeyspaceMetadata,
error) {
- s.mu.Lock()
- defer s.mu.Unlock()
-
- metadata, found := s.cache[keyspaceName]
+ metadata, found := s.getSchemaMetaForRead().keyspaceMeta[keyspaceName]
if !found {
- // refresh the cache for this keyspace
- err := s.refreshSchema(keyspaceName)
+ // refresh the cache
+ err := s.refreshSchemaMetadata()
Review Comment:
With this new implementation we expect the driver to always have the cache
up to date so if we don't find the metadata then just return
`ErrKeyspaceDoesNotExist` right away.
For this to work properly though we need to add a call to
`s.refreshSchemaMetadata()` as a replacement to this block in `session.init()`:
```
// Invoke KeyspaceChanged to let the policy cache the session keyspace
// parameters. This is used by tokenAwareHostPolicy to discover
replicas.
if !s.cfg.disableControlConn && s.cfg.Keyspace != "" {
s.policy.KeyspaceChanged(KeyspaceUpdateEvent{Keyspace:
s.cfg.Keyspace})
}
```
And we need to add a `s.refreshSchemaMetadata()` call to `(c *controlConn)
setupConn` inside the `if c.session.initialized() {` block
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]