joao-r-reis commented on code in PR #1926:
URL:
https://github.com/apache/cassandra-gocql-driver/pull/1926#discussion_r2747391286
##########
events.go:
##########
@@ -133,27 +133,42 @@ func (s *Session) handleEvent(framer *framer) {
}
func (s *Session) handleSchemaEvent(frames []frame) {
- // TODO: debounce events
+
+ if len(frames) > 0 {
+ s.control.awaitSchemaAgreement()
+ }
+
+ // Collect unique keyspaces that need schema refresh
+ keyspacesToRefresh := make(map[string]struct{})
+
for _, frame := range frames {
switch f := frame.(type) {
case *schemaChangeKeyspace:
s.schemaDescriber.clearSchema(f.keyspace)
- s.handleKeyspaceChange(f.keyspace, f.change)
+ s.policy.KeyspaceChanged(KeyspaceUpdateEvent{Keyspace:
f.keyspace, Change: f.change})
+ keyspacesToRefresh[f.keyspace] = struct{}{}
case *schemaChangeTable:
s.schemaDescriber.clearSchema(f.keyspace)
+ keyspacesToRefresh[f.keyspace] = struct{}{}
case *schemaChangeAggregate:
s.schemaDescriber.clearSchema(f.keyspace)
+ keyspacesToRefresh[f.keyspace] = struct{}{}
case *schemaChangeFunction:
s.schemaDescriber.clearSchema(f.keyspace)
+ keyspacesToRefresh[f.keyspace] = struct{}{}
case *schemaChangeType:
s.schemaDescriber.clearSchema(f.keyspace)
+ keyspacesToRefresh[f.keyspace] = struct{}{}
}
}
-}
-func (s *Session) handleKeyspaceChange(keyspace, change string) {
- s.control.awaitSchemaAgreement()
- s.policy.KeyspaceChanged(KeyspaceUpdateEvent{Keyspace: keyspace,
Change: change})
+ for keyspace := range keyspacesToRefresh {
+ if _, err := s.schemaDescriber.getSchema(keyspace); err != nil {
Review Comment:
The refresh schema operation should also log the time it took to do the full
refresh at debug level so that it is easier for us to benchmark its performance
and troubleshoot potential issues.
##########
events.go:
##########
@@ -133,27 +133,42 @@ func (s *Session) handleEvent(framer *framer) {
}
func (s *Session) handleSchemaEvent(frames []frame) {
- // TODO: debounce events
+
+ if len(frames) > 0 {
+ s.control.awaitSchemaAgreement()
+ }
+
+ // Collect unique keyspaces that need schema refresh
+ keyspacesToRefresh := make(map[string]struct{})
+
for _, frame := range frames {
switch f := frame.(type) {
case *schemaChangeKeyspace:
s.schemaDescriber.clearSchema(f.keyspace)
- s.handleKeyspaceChange(f.keyspace, f.change)
+ s.policy.KeyspaceChanged(KeyspaceUpdateEvent{Keyspace:
f.keyspace, Change: f.change})
+ keyspacesToRefresh[f.keyspace] = struct{}{}
case *schemaChangeTable:
s.schemaDescriber.clearSchema(f.keyspace)
+ keyspacesToRefresh[f.keyspace] = struct{}{}
case *schemaChangeAggregate:
s.schemaDescriber.clearSchema(f.keyspace)
+ keyspacesToRefresh[f.keyspace] = struct{}{}
case *schemaChangeFunction:
s.schemaDescriber.clearSchema(f.keyspace)
+ keyspacesToRefresh[f.keyspace] = struct{}{}
case *schemaChangeType:
s.schemaDescriber.clearSchema(f.keyspace)
+ keyspacesToRefresh[f.keyspace] = struct{}{}
}
}
-}
-func (s *Session) handleKeyspaceChange(keyspace, change string) {
- s.control.awaitSchemaAgreement()
- s.policy.KeyspaceChanged(KeyspaceUpdateEvent{Keyspace: keyspace,
Change: change})
+ for keyspace := range keyspacesToRefresh {
+ if _, err := s.schemaDescriber.getSchema(keyspace); err != nil {
Review Comment:
Just to reiterate, we should try to borrow from the java driver
implementation as much as possible for stuff that we know works well (this is
one of those cases). The java driver has a debouncer that does a full refresh
of the schema. It does a `SELECT *` on everything and just computes the entire
schema for every keyspace every time but this is OK because it's behind a
debouncer so even if a lot of events come in the event handling is a basic
"schedule" request on the debouncer and at some point in time the debouncer
will make sure to run the refresh schema operation on all keyspaces. This is a
simpler but effective approach that I think we should take here.
Technically there is already a `eventdebouncer` (different from the one I
added) but that one doesn't support requesting a "flush now" operation which is
part of the reason why I added mine (`refreshDebouncer`) for the node events.
The `eventdebouncer` for node/topology events basically just schedules things
on the refresh debouncer. I think we can basically just do the exact same for
the schema events with a new refresh debouncer instance (I think the same type
can be reused, you provide a custom refresh function that is called when
flushing).
We could improve on it by collecting the keyspaces on the events and only
processing the keyspaces that were changed (even though we SELECT all, it would
save some computation time) but that is not needed for the first implementation
of this.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]