This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 0446ae1a Add Replication Tests to Index Mode Measures (#893)
0446ae1a is described below
commit 0446ae1a509894742cd0e57778b9f418411e29ef
Author: Trista Pan <[email protected]>
AuthorDate: Wed Jan 21 21:26:57 2026 +0800
Add Replication Tests to Index Mode Measures (#893)
* Add an integration: replication for measure
---------
Co-authored-by: Copilot <[email protected]>
Co-authored-by: 吴晟 Wu Sheng <[email protected]>
Co-authored-by: Gao Hongtao <[email protected]>
---
CHANGES.md | 1 +
.../measure/testdata/group_stages/replicated.json | 34 ++++
pkg/test/measure/testdata/groups/replicated.json | 19 +++
.../measures/service_traffic_replicated.json | 44 +++++
test/cases/init.go | 1 +
test/cases/measure/data/input/entity_replicated.ql | 21 +++
.../measure/data/input/entity_replicated.yaml | 30 ++++
.../cases/measure/data/want/entity_replicated.yaml | 37 +++++
.../replication/replication_suite_test.go | 182 +++++++++++++++++++++
test/integration/replication/replication_test.go | 137 ++++++++++++++++
10 files changed, 506 insertions(+)
diff --git a/CHANGES.md b/CHANGES.md
index 2232be11..f7a9d528 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -16,6 +16,7 @@ Release Notes.
- Support writing data with specifications.
- Persist series metadata in liaison queue for measure, stream and trace
models.
- Update the dump tool to support analyzing the parts with smeta files.
+- Add replication integration test for measure.
- Activate the property repair mechanism by default.
- Add snapshot time retention policy to ensure the snapshot only can be
deleted after the configured minimum age(time).
diff --git a/pkg/test/measure/testdata/group_stages/replicated.json
b/pkg/test/measure/testdata/group_stages/replicated.json
new file mode 100644
index 00000000..5d07b9ab
--- /dev/null
+++ b/pkg/test/measure/testdata/group_stages/replicated.json
@@ -0,0 +1,34 @@
+{
+ "metadata": {
+ "name": "replicated_group"
+ },
+ "catalog": "CATALOG_MEASURE",
+ "resource_opts": {
+ "shard_num": 2,
+ "segment_interval": {
+ "unit": "UNIT_DAY",
+ "num": 1
+ },
+ "ttl": {
+ "unit": "UNIT_DAY",
+ "num": 7
+ },
+ "replicas": 2,
+ "stages": [
+ {
+ "name": "warm",
+ "shard_num": 1,
+ "segment_interval": {
+ "unit": "UNIT_DAY",
+ "num": 3
+ },
+ "ttl": {
+ "unit": "UNIT_DAY",
+ "num": 7
+ },
+ "node_selector": "type=warm"
+ }
+ ]
+ },
+ "updated_at": "2021-04-15T01:30:15.01Z"
+}
diff --git a/pkg/test/measure/testdata/groups/replicated.json
b/pkg/test/measure/testdata/groups/replicated.json
new file mode 100644
index 00000000..98c30b4d
--- /dev/null
+++ b/pkg/test/measure/testdata/groups/replicated.json
@@ -0,0 +1,19 @@
+{
+ "metadata": {
+ "name": "replicated_group"
+ },
+ "catalog": "CATALOG_MEASURE",
+ "resource_opts": {
+ "shard_num": 2,
+ "segment_interval": {
+ "unit": "UNIT_DAY",
+ "num": 1
+ },
+ "ttl": {
+ "unit": "UNIT_DAY",
+ "num": 7
+ },
+ "replicas": 2
+ },
+ "updated_at": "2021-04-15T01:30:15.01Z"
+}
diff --git a/pkg/test/measure/testdata/measures/service_traffic_replicated.json
b/pkg/test/measure/testdata/measures/service_traffic_replicated.json
new file mode 100644
index 00000000..73082adb
--- /dev/null
+++ b/pkg/test/measure/testdata/measures/service_traffic_replicated.json
@@ -0,0 +1,44 @@
+{
+ "metadata": {
+ "name": "service_traffic",
+ "group": "replicated_group"
+ },
+ "tag_families": [
+ {
+ "name": "default",
+ "tags": [
+ {
+ "name": "id",
+ "type": "TAG_TYPE_STRING"
+ },
+ {
+ "name": "service_id",
+ "type": "TAG_TYPE_STRING"
+ },
+ {
+ "name": "name",
+ "type": "TAG_TYPE_STRING"
+ },
+ {
+ "name": "short_name",
+ "type": "TAG_TYPE_STRING"
+ },
+ {
+ "name": "service_group",
+ "type": "TAG_TYPE_STRING"
+ },
+ {
+ "name": "layer",
+ "type": "TAG_TYPE_INT"
+ }
+ ]
+ }
+ ],
+ "entity": {
+ "tag_names": [
+ "id"
+ ]
+ },
+ "index_mode": true,
+ "updated_at": "2021-04-15T01:30:15.01Z"
+}
diff --git a/test/cases/init.go b/test/cases/init.go
index 71f3f520..677e994b 100644
--- a/test/cases/init.go
+++ b/test/cases/init.go
@@ -83,6 +83,7 @@ func Initialize(addr string, now time.Time) {
interval = time.Minute
casesmeasuredata.Write(conn, "service_traffic", "index_mode",
"service_traffic_data_old.json", now.AddDate(0, 0, -2), interval)
casesmeasuredata.Write(conn, "service_traffic", "index_mode",
"service_traffic_data.json", now, interval)
+ casesmeasuredata.Write(conn, "service_traffic", "replicated_group",
"service_traffic_data.json", now, interval)
casesmeasuredata.Write(conn, "service_instance_traffic", "sw_metric",
"service_instance_traffic_data.json", now, interval)
casesmeasuredata.Write(conn, "service_cpm_minute", "sw_metric",
"service_cpm_minute_data.json", now, interval)
casesmeasuredata.Write(conn, "instance_clr_cpu_minute", "sw_metric",
"instance_clr_cpu_minute_data.json", now, interval)
diff --git a/test/cases/measure/data/input/entity_replicated.ql
b/test/cases/measure/data/input/entity_replicated.ql
new file mode 100644
index 00000000..6d915944
--- /dev/null
+++ b/test/cases/measure/data/input/entity_replicated.ql
@@ -0,0 +1,21 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+SELECT service_id, layer, name, short_name FROM MEASURE service_traffic IN
replicated_group
+TIME > '-15m'
+WHERE id = '1'
diff --git a/test/cases/measure/data/input/entity_replicated.yaml
b/test/cases/measure/data/input/entity_replicated.yaml
new file mode 100644
index 00000000..13454431
--- /dev/null
+++ b/test/cases/measure/data/input/entity_replicated.yaml
@@ -0,0 +1,30 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+name: "service_traffic"
+groups: ["replicated_group"]
+tagProjection:
+ tagFamilies:
+ - name: "default"
+ tags: ["service_id", "layer", "name", "short_name"]
+criteria:
+ condition:
+ name: "id"
+ op: "BINARY_OP_EQ"
+ value:
+ str:
+ value: "1"
diff --git a/test/cases/measure/data/want/entity_replicated.yaml
b/test/cases/measure/data/want/entity_replicated.yaml
new file mode 100644
index 00000000..4fc05dc5
--- /dev/null
+++ b/test/cases/measure/data/want/entity_replicated.yaml
@@ -0,0 +1,37 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+dataPoints:
+- tagFamilies:
+ - name: default
+ tags:
+ - key: service_id
+ value:
+ str:
+ value: service_1
+ - key: layer
+ value:
+ int:
+ value: "1"
+ - key: name
+ value:
+ str:
+ value: service_name_1
+ - key: short_name
+ value:
+ str:
+ value: service_short_name_1
diff --git a/test/integration/replication/replication_suite_test.go
b/test/integration/replication/replication_suite_test.go
new file mode 100644
index 00000000..b4a88c2e
--- /dev/null
+++ b/test/integration/replication/replication_suite_test.go
@@ -0,0 +1,182 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package replication_test
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "testing"
+ "time"
+
+ . "github.com/onsi/ginkgo/v2"
+ . "github.com/onsi/gomega"
+ "github.com/onsi/gomega/gleak"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/credentials/insecure"
+
+ "github.com/apache/skywalking-banyandb/banyand/metadata"
+ "github.com/apache/skywalking-banyandb/banyand/metadata/embeddedetcd"
+ "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+ "github.com/apache/skywalking-banyandb/pkg/grpchelper"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/pool"
+ "github.com/apache/skywalking-banyandb/pkg/test"
+ "github.com/apache/skywalking-banyandb/pkg/test/flags"
+ "github.com/apache/skywalking-banyandb/pkg/test/gmatcher"
+ "github.com/apache/skywalking-banyandb/pkg/test/helpers"
+ test_measure "github.com/apache/skywalking-banyandb/pkg/test/measure"
+ test_property "github.com/apache/skywalking-banyandb/pkg/test/property"
+ "github.com/apache/skywalking-banyandb/pkg/test/setup"
+ test_stream "github.com/apache/skywalking-banyandb/pkg/test/stream"
+ test_trace "github.com/apache/skywalking-banyandb/pkg/test/trace"
+ "github.com/apache/skywalking-banyandb/pkg/timestamp"
+ test_cases "github.com/apache/skywalking-banyandb/test/cases"
+ casesmeasure "github.com/apache/skywalking-banyandb/test/cases/measure"
+)
+
+func TestReplication(t *testing.T) {
+ RegisterFailHandler(Fail)
+ RunSpecs(t, "Replication Suite")
+}
+
+var (
+ deferFunc func()
+ goods []gleak.Goroutine
+ now time.Time
+ connection *grpc.ClientConn
+ liaisonAddr string
+ etcdEndpoint string
+ dataNodeClosers []func()
+)
+
+var _ = SynchronizedBeforeSuite(func() []byte {
+ Expect(logger.Init(logger.Logging{
+ Env: "dev",
+ Level: flags.LogLevel,
+ })).To(Succeed())
+ pool.EnableStackTracking(true)
+ goods = gleak.Goroutines()
+
+ By("Starting etcd server")
+ ports, err := test.AllocateFreePorts(2)
+ Expect(err).NotTo(HaveOccurred())
+ dir, spaceDef, err := test.NewSpace()
+ Expect(err).NotTo(HaveOccurred())
+ ep := fmt.Sprintf("http://127.0.0.1:%d", ports[0])
+ etcdEndpoint = ep
+
+ server, err := embeddedetcd.NewServer(
+ embeddedetcd.ConfigureListener([]string{ep},
[]string{fmt.Sprintf("http://127.0.0.1:%d", ports[1])}),
+ embeddedetcd.RootDir(dir),
+ embeddedetcd.AutoCompactionMode("periodic"),
+ embeddedetcd.AutoCompactionRetention("1h"),
+ embeddedetcd.QuotaBackendBytes(2*1024*1024*1024),
+ )
+ Expect(err).ShouldNot(HaveOccurred())
+ <-server.ReadyNotify()
+
+ By("Loading schema")
+ schemaRegistry, err := schema.NewEtcdSchemaRegistry(
+ schema.Namespace(metadata.DefaultNamespace),
+ schema.ConfigureServerEndpoints([]string{ep}),
+ )
+ Expect(err).NotTo(HaveOccurred())
+ defer schemaRegistry.Close()
+
+ ctx := context.Background()
+ // Preload all schemas since test_cases.Initialize needs them
+ test_stream.PreloadSchema(ctx, schemaRegistry)
+ test_measure.PreloadSchema(ctx, schemaRegistry)
+ test_trace.PreloadSchema(ctx, schemaRegistry)
+ test_property.PreloadSchema(ctx, schemaRegistry)
+
+ By("Starting 3 data nodes for replication test")
+ dataNodeClosers = make([]func(), 0, 3)
+
+ for i := 0; i < 3; i++ {
+ closeDataNode := setup.DataNode(ep)
+ dataNodeClosers = append(dataNodeClosers, closeDataNode)
+ }
+
+ By("Starting liaison node")
+ liaisonAddr2, closerLiaisonNode := setup.LiaisonNode(ep)
+ liaisonAddr = liaisonAddr2
+
+ By("Initializing test cases")
+ ns := timestamp.NowMilli().UnixNano()
+ now = time.Unix(0, ns-ns%int64(time.Minute))
+
+ // Initialize test data - this loads data for all test types
+ test_cases.Initialize(liaisonAddr, now)
+
+ deferFunc = func() {
+ closerLiaisonNode()
+ for _, closeDataNode := range dataNodeClosers {
+ closeDataNode()
+ }
+ _ = server.Close()
+ <-server.StopNotify()
+ spaceDef()
+ }
+
+ suiteConfig := map[string]interface{}{
+ "liaison_addr": liaisonAddr,
+ "etcd_endpoint": etcdEndpoint,
+ "now": now.UnixNano(),
+ }
+
+ configBytes, err := json.Marshal(suiteConfig)
+ Expect(err).NotTo(HaveOccurred())
+ return configBytes
+}, func(configBytes []byte) {
+ var config map[string]interface{}
+ err := json.Unmarshal(configBytes, &config)
+ Expect(err).NotTo(HaveOccurred())
+
+ liaisonAddr = config["liaison_addr"].(string)
+ etcdEndpoint = config["etcd_endpoint"].(string)
+ now = time.Unix(0, int64(config["now"].(float64)))
+
+ var err2 error
+ connection, err2 = grpchelper.Conn(liaisonAddr, 10*time.Second,
+ grpc.WithTransportCredentials(insecure.NewCredentials()))
+ Expect(err2).NotTo(HaveOccurred())
+
+ // Only setup context for measure tests since we're only testing
measure replication
+ casesmeasure.SharedContext = helpers.SharedContext{
+ Connection: connection,
+ BaseTime: now,
+ }
+})
+
+var _ = SynchronizedAfterSuite(func() {
+ if connection != nil {
+ Expect(connection.Close()).To(Succeed())
+ }
+}, func() {})
+
+var _ = ReportAfterSuite("Replication Suite", func(report Report) {
+ if report.SuiteSucceeded {
+ if deferFunc != nil {
+ deferFunc()
+ }
+ Eventually(gleak.Goroutines,
flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods))
+ Eventually(pool.AllRefsCount,
flags.EventuallyTimeout).Should(gmatcher.HaveZeroRef())
+ }
+})
diff --git a/test/integration/replication/replication_test.go
b/test/integration/replication/replication_test.go
new file mode 100644
index 00000000..e2c0f2f7
--- /dev/null
+++ b/test/integration/replication/replication_test.go
@@ -0,0 +1,137 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package replication_test
+
+import (
+ "context"
+ "time"
+
+ g "github.com/onsi/ginkgo/v2"
+ gm "github.com/onsi/gomega"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/credentials/insecure"
+
+ commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+ databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+ "github.com/apache/skywalking-banyandb/banyand/metadata"
+ "github.com/apache/skywalking-banyandb/pkg/grpchelper"
+ "github.com/apache/skywalking-banyandb/pkg/test/flags"
+ "github.com/apache/skywalking-banyandb/pkg/test/helpers"
+ casesmeasuredata
"github.com/apache/skywalking-banyandb/test/cases/measure/data"
+)
+
+var _ = g.Describe("Replication", func() {
+ var conn *grpc.ClientConn
+
+ g.BeforeEach(func() {
+ var err error
+ conn, err = grpchelper.Conn(liaisonAddr, 10*time.Second,
+
grpc.WithTransportCredentials(insecure.NewCredentials()))
+ gm.Expect(err).NotTo(gm.HaveOccurred())
+ })
+
+ g.AfterEach(func() {
+ if conn != nil {
+ gm.Expect(conn.Close()).To(gm.Succeed())
+ }
+ })
+
+ g.Context("with replicated_group", func() {
+ g.It("should survive node failure", func() {
+ g.By("Verifying the measure exists in replicated_group")
+ ctx := context.Background()
+ measureMetadata := &commonv1.Metadata{
+ Name: "service_traffic",
+ Group: "replicated_group",
+ }
+
+ schemaClient :=
databasev1.NewMeasureRegistryServiceClient(conn)
+ resp, err := schemaClient.Get(ctx,
&databasev1.MeasureRegistryServiceGetRequest{Metadata: measureMetadata})
+ gm.Expect(err).NotTo(gm.HaveOccurred())
+ gm.Expect(resp.GetMeasure()).NotTo(gm.BeNil())
+
gm.Expect(resp.GetMeasure().GetMetadata().GetGroup()).To(gm.Equal("replicated_group"))
+
+ g.By("Getting list of all nodes from etcd (includes
data nodes + liaison)")
+ nodePath := "/" + metadata.DefaultNamespace + "/nodes"
+ allNodes, err2 := helpers.ListKeys(etcdEndpoint,
nodePath)
+ gm.Expect(err2).NotTo(gm.HaveOccurred())
+
+ // We have: 3 data nodes + 1 liaison node = 4 nodes
total
+ gm.Expect(len(allNodes)).To(gm.Equal(4),
+ "Should have 4 nodes total (3 data nodes + 1
liaison node), found %d", len(allNodes))
+
+ g.By("Stopping one data node")
+ // We should have 3 data node closers in dataNodeClosers
+ // Stop the first one
+ // Create a local copy to avoid mutating the
package-level slice
+ closersToStop := make([]func(), len(dataNodeClosers))
+ copy(closersToStop, dataNodeClosers)
+ closersToStop[0]()
+
+ // Wait for the cluster to stabilize
+ gm.Eventually(func() int {
+ nodes, err3 := helpers.ListKeys(etcdEndpoint,
nodePath)
+ if err3 != nil {
+ return 0
+ }
+ return len(nodes)
+ }, flags.EventuallyTimeout).Should(gm.Equal(3),
+ "Should have 3 nodes total after stopping one
data node (2 data nodes + 1 liaison)")
+
+ g.By("Verifying data is still accessible after node
failure")
+ verifyDataContentAfterNodeFailure(conn, now)
+
+ g.By("Verifying replication factor")
+ groupClient :=
databasev1.NewGroupRegistryServiceClient(conn)
+ groupResp, err := groupClient.Get(ctx,
&databasev1.GroupRegistryServiceGetRequest{
+ Group: "replicated_group",
+ })
+ gm.Expect(err).NotTo(gm.HaveOccurred())
+ gm.Expect(groupResp.GetGroup()).NotTo(gm.BeNil())
+
gm.Expect(groupResp.GetGroup().GetResourceOpts().GetReplicas()).To(gm.Equal(uint32(2)),
+ "replicated_group should have replicas=2")
+ })
+ })
+})
+
+func verifyDataContentAfterNodeFailure(conn *grpc.ClientConn, baseTime
time.Time) {
+ // This verifies that data is still accessible AFTER a node has failed
+
+ // Create a SharedContext like in measure.go
+ sharedContext := helpers.SharedContext{
+ Connection: conn,
+ BaseTime: baseTime,
+ }
+
+ // Create args for the entity_replicated test case
+ args := helpers.Args{
+ Input: "entity_replicated",
+ Duration: 25 * time.Minute,
+ Offset: -20 * time.Minute,
+ DisOrder: true,
+ }
+
+ // This will:
+ // 1. Read entity_replicated.ql and entity_replicated.yaml
+ // 2. Execute the query
+ // 3. Verify results match expected data
+ gm.Eventually(func(innerGm gm.Gomega) {
+ casesmeasuredata.VerifyFn(innerGm, sharedContext, args)
+ }, flags.EventuallyTimeout).Should(gm.Succeed(),
+ "Should be able to query and verify data content after node
failure")
+}