This is an automated email from the ASF dual-hosted git repository.
worryg0d pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-gocql-driver.git
The following commit(s) were added to refs/heads/trunk by this push:
new e78b47de Fix panic when using a HostFilter and keyspace is not
replicated to every DC
e78b47de is described below
commit e78b47de52873b27845ac8c6b96228f3706a5b12
Author: Bohdan Siryk <[email protected]>
AuthorDate: Thu May 7 13:09:59 2026 +0300
Fix panic when using a HostFilter and keyspace is not replicated to every DC
Previously, networkTopology.replicaMap didn't take into account the fact
the driver might filter
hosts of specific dcs by using HostFilter when computing amount of dcs with
replicas for a keyspace.
It wasn't problematic before as the tokenAwareHostPolicy was intended to
work with session-level keyspace.
However, it became problematic since v2.1.0 release which enabled the
policy to work with all keyspaces in the cluster.
This patch makes networkTopology.replicaMap dc-aware.
Patch by Bohdan Siryk; reviewed by João Reis for CASSGO-122
---
CHANGELOG.md | 6 ++++++
policies.go | 4 ++--
topology.go | 35 ++++++++++++++++++++++++++---------
topology_test.go | 36 ++++++++++++++++++++++++++++++++++--
4 files changed, 68 insertions(+), 13 deletions(-)
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 3b07b9bf..cba0d801 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -5,6 +5,12 @@ All notable changes to this project will be documented in this
file.
The format is based on [Keep a
Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic
Versioning](https://semver.org/spec/v2.0.0.html).
+## [2.1.2]
+
+### Fixed
+
+- Prevent panic when using a HostFilter and keyspace is not replicated to
every DC (CASSGO-122)
+
## [2.1.1]
### Fixed
diff --git a/policies.go b/policies.go
index 41f40780..3d880e97 100644
--- a/policies.go
+++ b/policies.go
@@ -524,7 +524,7 @@ func (t *tokenAwareHostPolicy) updateReplicas(keyspace
string) {
if _, ok := meta.replicas[key]; !ok {
metaUpdate := t.getMetadataForUpdate()
newReplicas :=
make(map[string]tokenRingReplicas, len(meta.replicas))
- newReplicas[key] =
strat.replicaMap(metaUpdate.tokenRing)
+ newReplicas[key] =
strat.replicaMap(metaUpdate.tokenRing, t.logger)
for k, replicas := range
metaUpdate.replicas {
newReplicas[k] = replicas
}
@@ -556,7 +556,7 @@ func (t *tokenAwareHostPolicy) updateAllReplicas(meta
*clusterMeta, schemaMeta *
if meta != nil && meta.tokenRing != nil {
key := strat.strategyKey()
if _, ok := newReplicas[key]; !ok {
- newReplicas[key] =
strat.replicaMap(meta.tokenRing)
+ newReplicas[key] =
strat.replicaMap(meta.tokenRing, t.logger)
}
}
}
diff --git a/topology.go b/topology.go
index 3ec20b15..0481b2ff 100644
--- a/topology.go
+++ b/topology.go
@@ -66,7 +66,7 @@ func (h tokenRingReplicas) replicasFor(t token) *hostTokens {
}
type placementStrategy interface {
- replicaMap(tokenRing *tokenRing) tokenRingReplicas
+ replicaMap(tokenRing *tokenRing, logger StructuredLogger)
tokenRingReplicas
replicationFactor(dc string) int
// strategyKey returns a unique identifier string for this strategy
instance.
// Two strategy instances with identical configuration should return
the same key.
@@ -162,7 +162,7 @@ func (s *simpleStrategy) replicationFactor(dc string) int {
return s.rf
}
-func (s *simpleStrategy) replicaMap(tokenRing *tokenRing) tokenRingReplicas {
+func (s *simpleStrategy) replicaMap(tokenRing *tokenRing, _ StructuredLogger)
tokenRingReplicas {
tokens := tokenRing.tokens
ring := make(tokenRingReplicas, len(tokens))
@@ -255,7 +255,7 @@ func (n *networkTopology) haveRF(replicaCounts
map[string]int) bool {
return true
}
-func (n *networkTopology) replicaMap(tokenRing *tokenRing) tokenRingReplicas {
+func (n *networkTopology) replicaMap(tokenRing *tokenRing, logger
StructuredLogger) tokenRingReplicas {
dcRacks := make(map[string]map[string]struct{}, len(n.dcs))
// skipped hosts in a dc
skipped := make(map[string][]*HostInfo, len(n.dcs))
@@ -324,7 +324,11 @@ func (n *networkTopology) replicaMap(tokenRing *tokenRing)
tokenRingReplicas {
continue
} else if replicasInDC[dc] >= rf {
if replicasInDC[dc] > rf {
- panic(fmt.Sprintf("replica overflow.
rf=%d have=%d in dc %q", rf, replicasInDC[dc], dc))
+ logger.Warning("Replica overflow.
Returning empty map.",
+ NewLogFieldInt("rf", rf),
+ NewLogFieldInt("have",
replicasInDC[dc]),
+ NewLogFieldString("dc", dc))
+ return tokenRingReplicas{}
}
// have enough replicas in this DC
@@ -372,23 +376,36 @@ func (n *networkTopology) replicaMap(tokenRing
*tokenRing) tokenRingReplicas {
}
if len(replicas) == 0 {
- panic(fmt.Sprintf("no replicas for token: %v",
th.token))
+ logger.Warning("No replicas for token. Returning empty
map.",
+ NewLogFieldString("token", th.token.String()))
+ return tokenRingReplicas{}
} else if !replicas[0].Equal(th.host) {
- panic(fmt.Sprintf("first replica is not the primary
replica for the token: expected %v got %v", replicas[0].ConnectAddress(),
th.host.ConnectAddress()))
+ logger.Warning("First replica is not the primary
replica for the token. Returning empty map.",
+ NewLogFieldString("token", th.token.String()),
+ NewLogFieldIP("expected",
replicas[0].ConnectAddress()),
+ NewLogFieldIP("got", th.host.ConnectAddress()))
+ return tokenRingReplicas{}
}
replicaRing = append(replicaRing, hostTokens{th.token,
replicas})
}
dcsWithReplicas := 0
- for _, dc := range n.dcs {
- if dc > 0 {
+ for dc, rf := range n.dcs {
+ // We should count only DCs that driver is aware of and have a
replication factor > 0
+ if _, knownDc := dcRacks[dc]; knownDc && rf > 0 {
dcsWithReplicas++
}
}
if dcsWithReplicas == len(dcRacks) && len(replicaRing) != len(tokens) {
- panic(fmt.Sprintf("token map different size to token ring: got
%d expected %d", len(replicaRing), len(tokens)))
+ logger.Warning("Unexpected state while building replica map.
Returning empty map.",
+ NewLogFieldString("strategy_key", n.strategyKey()),
+ NewLogFieldInt("dcs_with_replicas", dcsWithReplicas),
+ NewLogFieldInt("dcs_in_ring", len(dcRacks)),
+ NewLogFieldInt("token_ring_size", len(tokens)),
+ NewLogFieldInt("replica_ring_size", len(replicaRing)))
+ return tokenRingReplicas{}
}
return replicaRing
diff --git a/topology_test.go b/topology_test.go
index 10ee85e5..3e8fb9a2 100644
--- a/topology_test.go
+++ b/topology_test.go
@@ -31,6 +31,8 @@ import (
"fmt"
"sort"
"testing"
+
+ "github.com/stretchr/testify/require"
)
func TestPlacementStrategy_SimpleStrategy(t *testing.T) {
@@ -49,7 +51,7 @@ func TestPlacementStrategy_SimpleStrategy(t *testing.T) {
hosts := []*HostInfo{host0, host25, host50, host75}
strat := newSimpleStrategy(2)
- tokenReplicas := strat.replicaMap(&tokenRing{hosts: hosts, tokens:
tokens})
+ tokenReplicas := strat.replicaMap(&tokenRing{hosts: hosts, tokens:
tokens}, nopLoggerSingleton)
if len(tokenReplicas) != len(tokens) {
t.Fatalf("expected replica map to have %d items but has %d",
len(tokens), len(tokenReplicas))
}
@@ -157,7 +159,7 @@ func TestPlacementStrategy_NetworkStrategy(t *testing.T) {
expReplicas += rf
}
- tokenReplicas :=
test.strat.replicaMap(&tokenRing{hosts: hosts, tokens: tokens})
+ tokenReplicas :=
test.strat.replicaMap(&tokenRing{hosts: hosts, tokens: tokens},
nopLoggerSingleton)
if len(tokenReplicas) != test.expectedReplicaMapSize {
t.Fatalf("expected replica map to have %d items
but has %d", test.expectedReplicaMapSize,
len(tokenReplicas))
@@ -224,3 +226,33 @@ func TestPlacementStrategy_NetworkStrategy(t *testing.T) {
})
}
}
+
+// Regression test for CASSGO-122:
+// when the token ring only contains hosts from a DC that has RF=0/unspecified
for a keyspace,
+// networkTopology.replicaMap should return an empty replica map.
+func
TestPlacementStrategy_NetworkStrategy_ReturnEmptyReplicaMapWhenNoReplicasInRing(t
*testing.T) {
+ strat := newNetworkTopology(map[string]int{
+ "dc1": 3, // replicated only in dc1
+ })
+
+ // Hosts in ring only from dc2, so no replicas should be returned.
+ // hostId format: dc:rack:host which is used as a token in the token
ring.
+ // It makes sense to use the hostId as a token in the token ring
because it is unique and deterministic for test purpose.
+ hosts := []*HostInfo{
+ {hostId: "dc2:rack1:0", dataCenter: "dc2", rack: "rack1"},
+ {hostId: "dc2:rack2:1", dataCenter: "dc2", rack: "rack2"},
+ {hostId: "dc2:rack3:2", dataCenter: "dc2", rack: "rack3"},
+ }
+
+ tokens := make([]hostToken, 0, len(hosts))
+ for _, h := range hosts {
+ tokens = append(tokens, hostToken{
+ token: orderedToken(h.hostId),
+ host: h,
+ })
+ }
+ sort.Sort(&tokenRing{tokens: tokens})
+
+ replicas := strat.replicaMap(&tokenRing{hosts: hosts, tokens: tokens},
nopLoggerSingleton)
+ require.Empty(t, replicas, "expected no replicas, got %d",
len(replicas))
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]