This is an automated email from the ASF dual-hosted git repository.
joaoreis pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-gocql-driver.git
The following commit(s) were added to refs/heads/trunk by this push:
new f3d13d4 Add support for cassandra 4.0 table options
f3d13d4 is described below
commit f3d13d4d0bf4e4cee186a47ed5a2c5e5a7fe3e54
Author: mykyta.oleksiienko <[email protected]>
AuthorDate: Wed Dec 11 12:56:26 2024 +0200
Add support for cassandra 4.0 table options
In the PR implemented backward compatibility with previous versions,
and added new types support. To make metadata table support easier for
future Cassandra versions, hardcode scan from Cassandra were replaced
with new "parseSystemSchemaViews" method which is much easier to expand,
even if some fields were added in the middle of the table it wouldn`t be an
issue anymore.
patch by Mykyta Oleksiienko; reviewed by Joao Reis and James Harting
CASSGO-13
---
CHANGELOG.md | 2 +
cassandra_test.go | 25 ++++--
metadata.go | 238 ++++++++++++++++++++++++++++++++++++++++++------------
3 files changed, 208 insertions(+), 57 deletions(-)
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 72945a6..5cfff09 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -21,6 +21,8 @@ and this project adheres to [Semantic
Versioning](https://semver.org/spec/v2.0.0
- Change Batch API to be consistent with Query() (CASSGO-7)
+- Added Cassandra 4.0 table options support(CASSGO-13)
+
- Remove deprecated global logger (CASSGO-24)
- Bumped actions/upload-artifact and actions/cache versions to v4 in CI
workflow (CASSGO-48)
diff --git a/cassandra_test.go b/cassandra_test.go
index 9fa9aba..545b307 100644
--- a/cassandra_test.go
+++ b/cassandra_test.go
@@ -2328,7 +2328,7 @@ func TestGetColumnMetadata(t *testing.T) {
func TestMaterializedViewMetadata(t *testing.T) {
if flagCassVersion.Before(3, 0, 0) {
- return
+ t.Skip("The Cassandra version is too old")
}
session := createSession(t)
defer session.Close()
@@ -2347,14 +2347,19 @@ func TestMaterializedViewMetadata(t *testing.T) {
expectedChunkLengthInKB := "16"
expectedDCLocalReadRepairChance := float64(0)
expectedSpeculativeRetry := "99p"
+ expectedAdditionalWritePolicy := "99p"
+ expectedReadRepair := "BLOCKING"
if flagCassVersion.Before(4, 0, 0) {
expectedChunkLengthInKB = "64"
expectedDCLocalReadRepairChance = 0.1
expectedSpeculativeRetry = "99PERCENTILE"
+ expectedReadRepair = ""
+ expectedAdditionalWritePolicy = ""
}
expectedView1 := MaterializedViewMetadata{
Keyspace: "gocql_test",
Name: "view_view",
+ AdditionalWritePolicy: expectedAdditionalWritePolicy,
baseTableName: "view_table",
BloomFilterFpChance: 0.01,
Caching: map[string]string{"keys": "ALL",
"rows_per_partition": "NONE"},
@@ -2366,12 +2371,17 @@ func TestMaterializedViewMetadata(t *testing.T) {
DefaultTimeToLive: 0,
Extensions: map[string]string{},
GcGraceSeconds: 864000,
- IncludeAllColumns: false, MaxIndexInterval: 2048,
MemtableFlushPeriodInMs: 0, MinIndexInterval: 128, ReadRepairChance: 0,
- SpeculativeRetry: expectedSpeculativeRetry,
+ IncludeAllColumns: false, MaxIndexInterval: 2048,
+ MemtableFlushPeriodInMs: 0,
+ MinIndexInterval: 128,
+ ReadRepair: expectedReadRepair,
+ ReadRepairChance: 0,
+ SpeculativeRetry: expectedSpeculativeRetry,
}
expectedView2 := MaterializedViewMetadata{
Keyspace: "gocql_test",
Name: "view_view2",
+ AdditionalWritePolicy: expectedAdditionalWritePolicy,
baseTableName: "view_table2",
BloomFilterFpChance: 0.01,
Caching: map[string]string{"keys": "ALL",
"rows_per_partition": "NONE"},
@@ -2383,8 +2393,13 @@ func TestMaterializedViewMetadata(t *testing.T) {
DefaultTimeToLive: 0,
Extensions: map[string]string{},
GcGraceSeconds: 864000,
- IncludeAllColumns: false, MaxIndexInterval: 2048,
MemtableFlushPeriodInMs: 0, MinIndexInterval: 128, ReadRepairChance: 0,
- SpeculativeRetry: expectedSpeculativeRetry,
+ IncludeAllColumns: false,
+ MaxIndexInterval: 2048,
+ MemtableFlushPeriodInMs: 0,
+ MinIndexInterval: 128,
+ ReadRepair: expectedReadRepair,
+ ReadRepairChance: 0,
+ SpeculativeRetry: expectedSpeculativeRetry,
}
expectedView1.BaseTableId = materializedViews[0].BaseTableId
diff --git a/metadata.go b/metadata.go
index e8eb473..63e27ae 100644
--- a/metadata.go
+++ b/metadata.go
@@ -111,6 +111,7 @@ type AggregateMetadata struct {
type MaterializedViewMetadata struct {
Keyspace string
Name string
+ AdditionalWritePolicy string
BaseTableId UUID
BaseTable *TableMetadata
BloomFilterFpChance float64
@@ -128,7 +129,8 @@ type MaterializedViewMetadata struct {
MaxIndexInterval int
MemtableFlushPeriodInMs int
MinIndexInterval int
- ReadRepairChance float64
+ ReadRepair string // Only present in Cassandra 4.0+
+ ReadRepairChance float64 // Note: Cassandra 4.0 removed
ReadRepairChance and added ReadRepair instead
SpeculativeRetry string
baseTableName string
@@ -975,69 +977,201 @@ func getUserTypeMetadata(session *Session, keyspaceName
string) ([]UserTypeMetad
return uTypes, nil
}
+func bytesMapToStringsMap(byteData map[string][]byte) map[string]string {
+ extensions := make(map[string]string, len(byteData))
+ for key, rowByte := range byteData {
+ extensions[key] = string(rowByte)
+ }
+
+ return extensions
+}
+
+func materializedViewMetadataFromMap(currentObject map[string]interface{},
materializedView *MaterializedViewMetadata) error {
+ const errorMessage = "gocql.materializedViewMetadataFromMap failed to
read column %s"
+ var ok bool
+ for key, value := range currentObject {
+ switch key {
+ case "keyspace_name":
+ materializedView.Keyspace, ok = value.(string)
+ if !ok {
+ return fmt.Errorf(errorMessage, key)
+ }
+
+ case "view_name":
+ materializedView.Name, ok = value.(string)
+ if !ok {
+ return fmt.Errorf(errorMessage, key)
+ }
+
+ case "additional_write_policy":
+ materializedView.AdditionalWritePolicy, ok =
value.(string)
+ if !ok {
+ return fmt.Errorf(errorMessage, key)
+ }
+
+ case "base_table_id":
+ materializedView.BaseTableId, ok = value.(UUID)
+ if !ok {
+ return fmt.Errorf(errorMessage, key)
+ }
+
+ case "base_table_name":
+ materializedView.baseTableName, ok = value.(string)
+ if !ok {
+ return fmt.Errorf(errorMessage, key)
+ }
+
+ case "bloom_filter_fp_chance":
+ materializedView.BloomFilterFpChance, ok =
value.(float64)
+ if !ok {
+ return fmt.Errorf(errorMessage, key)
+ }
+
+ case "caching":
+ materializedView.Caching, ok = value.(map[string]string)
+ if !ok {
+ return fmt.Errorf(errorMessage, key)
+ }
+
+ case "comment":
+ materializedView.Comment, ok = value.(string)
+ if !ok {
+ return fmt.Errorf(errorMessage, key)
+ }
+
+ case "compaction":
+ materializedView.Compaction, ok =
value.(map[string]string)
+ if !ok {
+ return fmt.Errorf(errorMessage, key)
+ }
+
+ case "compression":
+ materializedView.Compression, ok =
value.(map[string]string)
+ if !ok {
+ return fmt.Errorf(errorMessage, key)
+ }
+
+ case "crc_check_chance":
+ materializedView.CrcCheckChance, ok = value.(float64)
+ if !ok {
+ return fmt.Errorf(errorMessage, key)
+ }
+
+ case "dclocal_read_repair_chance":
+ materializedView.DcLocalReadRepairChance, ok =
value.(float64)
+ if !ok {
+ return fmt.Errorf(errorMessage, key)
+ }
+
+ case "default_time_to_live":
+ materializedView.DefaultTimeToLive, ok = value.(int)
+ if !ok {
+ return fmt.Errorf(errorMessage, key)
+ }
+
+ case "extensions":
+ byteData, ok := value.(map[string][]byte)
+ if !ok {
+ return fmt.Errorf(errorMessage, key)
+ }
+
+ materializedView.Extensions =
bytesMapToStringsMap(byteData)
+
+ case "gc_grace_seconds":
+ materializedView.GcGraceSeconds, ok = value.(int)
+ if !ok {
+ return fmt.Errorf(errorMessage, key)
+ }
+
+ case "id":
+ materializedView.Id, ok = value.(UUID)
+ if !ok {
+ return fmt.Errorf(errorMessage, key)
+ }
+
+ case "include_all_columns":
+ materializedView.IncludeAllColumns, ok = value.(bool)
+ if !ok {
+ return fmt.Errorf(errorMessage, key)
+ }
+
+ case "max_index_interval":
+ materializedView.MaxIndexInterval, ok = value.(int)
+ if !ok {
+ return fmt.Errorf(errorMessage, key)
+ }
+
+ case "memtable_flush_period_in_ms":
+ materializedView.MemtableFlushPeriodInMs, ok =
value.(int)
+ if !ok {
+ return fmt.Errorf(errorMessage, key)
+ }
+
+ case "min_index_interval":
+ materializedView.MinIndexInterval, ok = value.(int)
+ if !ok {
+ return fmt.Errorf(errorMessage, key)
+ }
+
+ case "read_repair":
+ materializedView.ReadRepair, ok = value.(string)
+ if !ok {
+ return fmt.Errorf(errorMessage, key)
+ }
+
+ case "read_repair_chance":
+ materializedView.ReadRepairChance, ok = value.(float64)
+ if !ok {
+ return fmt.Errorf(errorMessage, key)
+ }
+
+ case "speculative_retry":
+ materializedView.SpeculativeRetry, ok = value.(string)
+ if !ok {
+ return fmt.Errorf(errorMessage, key)
+ }
+
+ }
+ }
+ return nil
+}
+
+func parseSystemSchemaViews(iter *Iter) ([]MaterializedViewMetadata, error) {
+ var materializedViews []MaterializedViewMetadata
+ s, err := iter.SliceMap()
+ if err != nil {
+ return nil, err
+ }
+
+ for _, row := range s {
+ var materializedView MaterializedViewMetadata
+ err = materializedViewMetadataFromMap(row, &materializedView)
+ if err != nil {
+ return nil, err
+ }
+
+ materializedViews = append(materializedViews, materializedView)
+ }
+
+ return materializedViews, nil
+}
+
func getMaterializedViewsMetadata(session *Session, keyspaceName string)
([]MaterializedViewMetadata, error) {
if !session.useSystemSchema {
return nil, nil
}
var tableName = "system_schema.views"
stmt := fmt.Sprintf(`
- SELECT
- view_name,
- base_table_id,
- base_table_name,
- bloom_filter_fp_chance,
- caching,
- comment,
- compaction,
- compression,
- crc_check_chance,
- dclocal_read_repair_chance,
- default_time_to_live,
- extensions,
- gc_grace_seconds,
- id,
- include_all_columns,
- max_index_interval,
- memtable_flush_period_in_ms,
- min_index_interval,
- read_repair_chance,
- speculative_retry
+ SELECT *
FROM %s
WHERE keyspace_name = ?`, tableName)
var materializedViews []MaterializedViewMetadata
- rows := session.control.query(stmt, keyspaceName).Scanner()
- for rows.Next() {
- materializedView := MaterializedViewMetadata{Keyspace:
keyspaceName}
- err := rows.Scan(&materializedView.Name,
- &materializedView.BaseTableId,
- &materializedView.baseTableName,
- &materializedView.BloomFilterFpChance,
- &materializedView.Caching,
- &materializedView.Comment,
- &materializedView.Compaction,
- &materializedView.Compression,
- &materializedView.CrcCheckChance,
- &materializedView.DcLocalReadRepairChance,
- &materializedView.DefaultTimeToLive,
- &materializedView.Extensions,
- &materializedView.GcGraceSeconds,
- &materializedView.Id,
- &materializedView.IncludeAllColumns,
- &materializedView.MaxIndexInterval,
- &materializedView.MemtableFlushPeriodInMs,
- &materializedView.MinIndexInterval,
- &materializedView.ReadRepairChance,
- &materializedView.SpeculativeRetry,
- )
- if err != nil {
- return nil, err
- }
- materializedViews = append(materializedViews, materializedView)
- }
+ iter := session.control.query(stmt, keyspaceName)
- if err := rows.Err(); err != nil {
+ materializedViews, err := parseSystemSchemaViews(iter)
+ if err != nil {
return nil, err
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]