joao-r-reis commented on code in PR #1926:
URL:
https://github.com/apache/cassandra-gocql-driver/pull/1926#discussion_r2848295367
##########
cassandra_test.go:
##########
@@ -2677,119 +2757,149 @@ func TestKeyspaceMetadata(t *testing.T) {
t.Fatalf("failed to create index with err: %v", err)
}
- keyspaceMetadata, err := session.KeyspaceMetadata("gocql_test")
- if err != nil {
- t.Fatalf("failed to query keyspace metadata with err: %v", err)
- }
- if keyspaceMetadata == nil {
- t.Fatal("expected the keyspace metadata to not be nil, but it
was nil")
- }
- if keyspaceMetadata.Name != session.cfg.Keyspace {
- t.Fatalf("Expected the keyspace name to be %s but was %s",
session.cfg.Keyspace, keyspaceMetadata.Name)
- }
- if len(keyspaceMetadata.Tables) == 0 {
- t.Errorf("Expected tables but there were none")
- }
+ for _, tc := range testCases {
+ t.Run(tc.name, func(t *testing.T) {
+ session := createSession(t, func(config *ClusterConfig)
{
+ config.MetadataCacheMode = tc.cacheMode
+ })
+ defer session.Close()
+ // Query keyspace metadata
+ keyspaceMetadata, err :=
session.KeyspaceMetadata("gocql_test")
+ if err != nil {
+ t.Fatalf("failed to query keyspace metadata
with err: %v", err)
+ }
+ if keyspaceMetadata == nil {
+ t.Fatal("expected the keyspace metadata to not
be nil, but it was nil")
+ }
+ if keyspaceMetadata.Name != session.cfg.Keyspace {
+ t.Fatalf("Expected the keyspace name to be %s
but was %s", session.cfg.Keyspace, keyspaceMetadata.Name)
+ }
- tableMetadata, found := keyspaceMetadata.Tables["test_metadata"]
- if !found {
- t.Fatalf("failed to find the test_metadata table metadata")
- }
+ if tc.expectFullMetadata {
+ if len(keyspaceMetadata.Tables) == 0 {
Review Comment:
can we add a quick check for when metadata cache is disabled to make sure
the cache is empty?
##########
cassandra_test.go:
##########
@@ -2955,13 +3065,40 @@ func TestTokenAwareConnPool(t *testing.T) {
t.Fatalf("failed to insert with err: %v", err)
}
- query = session.Query("SELECT data FROM test_token_aware where id = ?",
42).Consistency(One)
+ // Verify token-aware routing: queries with the same partition key
should
+ // consistently go to the same host (proving token awareness)
+ var queriedHosts []*HostInfo
+
+ observer := funcQueryObserver(func(ctx context.Context, o
ObservedQuery) {
Review Comment:
I think in addition to this we should also use `Query.Trace()`, if I'm not
mistaken the trace events (each event is a row from the table I think) obtained
from C* would have the `Source` be the same as `Coordinator` which means every
event originated from the coordinator so there were no hops to nodes that
weren't the coordinator (maybe need to set consistency level ONE?) if token
awareness is working correctly.
I think the default implementation `NewTraceWriter` already reads the trace
data from the system tables and it writes to the provided `io.Writer`. We could
create a custom test tracer that stores or outputs this data in a different
format but the existing tracer is probably good enough for what we want to test
but it will require some string parsing/matching. See [C# driver example
here](https://github.com/datastax/csharp-driver/blob/f68365ba64117d57b3c26763612dcfff35dab64f/src/Cassandra.IntegrationTests/Policies/Tests/LoadBalancingPolicyShortTests.cs#L84-L88).
##########
cluster.go:
##########
@@ -42,6 +42,36 @@ type PoolConfig struct {
HostSelectionPolicy HostSelectionPolicy
}
+// MetadataCacheMode controls how the driver reads and caches schema metadata
from Cassandra system tables.
+// This affects the behavior of Session.KeyspaceMetadata and token-aware host
selection policies.
+//
+// See the individual mode constants (Full, KeyspaceOnly, Disabled) for
detailed behavior of each mode.
+type MetadataCacheMode int
+
+const (
+ // Full mode reads and caches all schema metadata including keyspaces,
tables, columns,
+ // functions, aggregates, user-defined types, and materialized views.
+ //
+ // Token-aware routing works normally with full replica information.
+ // Session.KeyspaceMetadata returns cached metadata without querying
system tables.
+ Full MetadataCacheMode = iota
+
+ // KeyspaceOnly mode reads and caches only keyspace metadata
(replication strategy and options).
+ // This enables token-aware routing without the overhead of caching
detailed schema information.
+ //
+ // Token-aware routing works normally with full replica information.
Review Comment:
`Token-aware routing works normally (if TokenAwareHostPolicy is used) with
full replica information.`
##########
cluster.go:
##########
@@ -42,6 +42,36 @@ type PoolConfig struct {
HostSelectionPolicy HostSelectionPolicy
}
+// MetadataCacheMode controls how the driver reads and caches schema metadata
from Cassandra system tables.
+// This affects the behavior of Session.KeyspaceMetadata and token-aware host
selection policies.
+//
+// See the individual mode constants (Full, KeyspaceOnly, Disabled) for
detailed behavior of each mode.
+type MetadataCacheMode int
+
+const (
+ // Full mode reads and caches all schema metadata including keyspaces,
tables, columns,
+ // functions, aggregates, user-defined types, and materialized views.
+ //
+ // Token-aware routing works normally with full replica information.
+ // Session.KeyspaceMetadata returns cached metadata without querying
system tables.
+ Full MetadataCacheMode = iota
+
+ // KeyspaceOnly mode reads and caches only keyspace metadata
(replication strategy and options).
+ // This enables token-aware routing without the overhead of caching
detailed schema information.
Review Comment:
`This enables token-aware routing (if TokenAwareHostPolicy is used)`
##########
metadata.go:
##########
@@ -243,95 +246,257 @@ const (
// queries the cluster for schema information for a specific keyspace
type schemaDescriber struct {
- session *Session
- mu sync.Mutex
+ session *Session
+ schemaRefresher *refreshDebouncer
+ schemaMeta atomic.Value // *schemaMeta
+}
+
+// Schema change type constants as defined in the Cassandra Native Protocol
specification.
+// These values indicate the nature of schema modifications that occurred.
+//
+// See:
https://cassandra.apache.org/doc/latest/cassandra/reference/native-protocol.html
+//
+// Schema change events are server-initiated messages sent to clients that
have registered
+// for schema change notifications. These events indicate modifications to
keyspaces, tables,
+// user-defined types, functions, or aggregates.
+const (
+ SchemaChangeTypeCreated = "CREATED" // Schema object was created
+ SchemaChangeTypeUpdated = "UPDATED" // Schema object was modified
+ SchemaChangeTypeDropped = "DROPPED" // Schema object was removed
+)
- cache map[string]*KeyspaceMetadata
+type schemaMeta struct {
+ keyspaceMeta map[string]*KeyspaceMetadata
}
// creates a session bound schema describer which will query and cache
// keyspace metadata
-func newSchemaDescriber(session *Session) *schemaDescriber {
- return &schemaDescriber{
+func newSchemaDescriber(session *Session, schemaRefresher *refreshDebouncer)
*schemaDescriber {
+ meta := new(schemaMeta)
+ describer := &schemaDescriber{
session: session,
- cache: map[string]*KeyspaceMetadata{},
}
+ describer.schemaMeta.Store(meta)
+ describer.schemaRefresher = schemaRefresher
+ return describer
+}
+
+func (s *schemaDescriber) getSchemaMetaForRead() *schemaMeta {
+ meta, _ := s.schemaMeta.Load().(*schemaMeta)
+ return meta
+}
+
+func (s *schemaDescriber) getSchemaMetaForUpdate() *schemaMeta {
+ meta := s.getSchemaMetaForRead()
+ metaNew := new(schemaMeta)
+ if meta != nil {
+ *metaNew = *meta
+ }
+ return metaNew
}
// returns the cached KeyspaceMetadata held by the describer for the named
// keyspace.
func (s *schemaDescriber) getSchema(keyspaceName string) (*KeyspaceMetadata,
error) {
- s.mu.Lock()
- defer s.mu.Unlock()
+ if s.session.cfg.MetadataCacheMode == Disabled {
+ return s.fetchSchema(keyspaceName)
+ }
+ metadata, found := s.getSchemaMetaForRead().keyspaceMeta[keyspaceName]
- metadata, found := s.cache[keyspaceName]
if !found {
- // refresh the cache for this keyspace
- err := s.refreshSchema(keyspaceName)
- if err != nil {
- return nil, err
- }
-
- metadata = s.cache[keyspaceName]
+ return nil, ErrKeyspaceDoesNotExist
}
return metadata, nil
}
-// clears the already cached keyspace metadata
-func (s *schemaDescriber) clearSchema(keyspaceName string) {
- s.mu.Lock()
- defer s.mu.Unlock()
-
- delete(s.cache, keyspaceName)
-}
-
-// forcibly updates the current KeyspaceMetadata held by the schema describer
-// for a given named keyspace.
-func (s *schemaDescriber) refreshSchema(keyspaceName string) error {
+func (s *schemaDescriber) fetchSchema(keyspaceName string) (*KeyspaceMetadata,
error) {
var err error
// query the system keyspace for schema data
// TODO retrieve concurrently
keyspace, err := getKeyspaceMetadata(s.session, keyspaceName)
if err != nil {
- return err
+ return nil, err
}
tables, err := getTableMetadata(s.session, keyspaceName)
if err != nil {
- return err
+ return nil, err
}
columns, err := getColumnMetadata(s.session, keyspaceName)
if err != nil {
- return err
+ return nil, err
}
functions, err := getFunctionsMetadata(s.session, keyspaceName)
if err != nil {
- return err
+ return nil, err
}
aggregates, err := getAggregatesMetadata(s.session, keyspaceName)
if err != nil {
- return err
+ return nil, err
}
userTypes, err := getUserTypeMetadata(s.session, keyspaceName)
if err != nil {
- return err
+ return nil, err
}
materializedViews, err := getMaterializedViewsMetadata(s.session,
keyspaceName)
if err != nil {
- return err
+ return nil, err
}
// organize the schema data
compileMetadata(s.session, keyspace, tables, columns, functions,
aggregates, userTypes,
materializedViews)
- // update the cache
- s.cache[keyspaceName] = keyspace
+ return keyspace, nil
+}
+// forcibly updates the current KeyspaceMetadata held by the schema describer
+// for all the keyspaces.
+// This function is called via schemaRefresher refreshDebouncer to batch and
+// debounce schema refresh requests.
+func refreshSchemas(session *Session) error {
+ start := time.Now()
+ defer func() {
+ elapsed := time.Since(start)
+ session.logger.Debug("Schema refresh completed",
Review Comment:
I still think this is misleading, I really think this message should say
"failed" or just not exist if the refresh failed.
##########
cluster.go:
##########
@@ -42,6 +42,36 @@ type PoolConfig struct {
HostSelectionPolicy HostSelectionPolicy
}
+// MetadataCacheMode controls how the driver reads and caches schema metadata
from Cassandra system tables.
+// This affects the behavior of Session.KeyspaceMetadata and token-aware host
selection policies.
+//
+// See the individual mode constants (Full, KeyspaceOnly, Disabled) for
detailed behavior of each mode.
+type MetadataCacheMode int
+
+const (
+ // Full mode reads and caches all schema metadata including keyspaces,
tables, columns,
+ // functions, aggregates, user-defined types, and materialized views.
+ //
+ // Token-aware routing works normally with full replica information.
Review Comment:
`Token-aware routing works normally (if TokenAwareHostPolicy is used) with
full replica information.`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]