This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 0446ae1a Add Replication Tests to Index Mode Measures (#893)
0446ae1a is described below

commit 0446ae1a509894742cd0e57778b9f418411e29ef
Author: Trista Pan <[email protected]>
AuthorDate: Wed Jan 21 21:26:57 2026 +0800

    Add Replication Tests to Index Mode Measures (#893)
    
    * Add an integration: replication for measure
    ---------
    
    Co-authored-by: Copilot <[email protected]>
    Co-authored-by: 吴晟 Wu Sheng <[email protected]>
    Co-authored-by: Gao Hongtao <[email protected]>
---
 CHANGES.md                                         |   1 +
 .../measure/testdata/group_stages/replicated.json  |  34 ++++
 pkg/test/measure/testdata/groups/replicated.json   |  19 +++
 .../measures/service_traffic_replicated.json       |  44 +++++
 test/cases/init.go                                 |   1 +
 test/cases/measure/data/input/entity_replicated.ql |  21 +++
 .../measure/data/input/entity_replicated.yaml      |  30 ++++
 .../cases/measure/data/want/entity_replicated.yaml |  37 +++++
 .../replication/replication_suite_test.go          | 182 +++++++++++++++++++++
 test/integration/replication/replication_test.go   | 137 ++++++++++++++++
 10 files changed, 506 insertions(+)

diff --git a/CHANGES.md b/CHANGES.md
index 2232be11..f7a9d528 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -16,6 +16,7 @@ Release Notes.
 - Support writing data with specifications.
 - Persist series metadata in liaison queue for measure, stream and trace 
models.
 - Update the dump tool to support analyzing the parts with smeta files.
+- Add replication integration test for measure.
 - Activate the property repair mechanism by default.
 - Add snapshot time retention policy to ensure the snapshot only can be 
deleted after the configured minimum age(time).
 
diff --git a/pkg/test/measure/testdata/group_stages/replicated.json 
b/pkg/test/measure/testdata/group_stages/replicated.json
new file mode 100644
index 00000000..5d07b9ab
--- /dev/null
+++ b/pkg/test/measure/testdata/group_stages/replicated.json
@@ -0,0 +1,34 @@
+{
+  "metadata": {
+    "name": "replicated_group"
+  },
+  "catalog": "CATALOG_MEASURE",
+  "resource_opts": {
+    "shard_num": 2,
+    "segment_interval": {
+      "unit": "UNIT_DAY",
+      "num": 1
+    },
+    "ttl": {
+      "unit": "UNIT_DAY",
+      "num": 7
+    },
+    "replicas": 2,
+    "stages": [
+      {
+        "name": "warm",
+        "shard_num": 1,
+        "segment_interval": {
+          "unit": "UNIT_DAY",
+          "num": 3
+        },
+        "ttl": {
+          "unit": "UNIT_DAY",
+          "num": 7
+        },
+        "node_selector": "type=warm"
+      }
+    ]
+  },
+  "updated_at": "2021-04-15T01:30:15.01Z"
+}
diff --git a/pkg/test/measure/testdata/groups/replicated.json 
b/pkg/test/measure/testdata/groups/replicated.json
new file mode 100644
index 00000000..98c30b4d
--- /dev/null
+++ b/pkg/test/measure/testdata/groups/replicated.json
@@ -0,0 +1,19 @@
+{
+  "metadata": {
+    "name": "replicated_group"
+  },
+  "catalog": "CATALOG_MEASURE",
+  "resource_opts": {
+    "shard_num": 2,
+    "segment_interval": {
+      "unit": "UNIT_DAY",
+      "num": 1
+    },
+    "ttl": {
+      "unit": "UNIT_DAY",
+      "num": 7
+    },
+    "replicas": 2
+  },
+  "updated_at": "2021-04-15T01:30:15.01Z"
+}
diff --git a/pkg/test/measure/testdata/measures/service_traffic_replicated.json 
b/pkg/test/measure/testdata/measures/service_traffic_replicated.json
new file mode 100644
index 00000000..73082adb
--- /dev/null
+++ b/pkg/test/measure/testdata/measures/service_traffic_replicated.json
@@ -0,0 +1,44 @@
+{
+  "metadata": {
+    "name": "service_traffic",
+    "group": "replicated_group"
+  },
+  "tag_families": [
+    {
+      "name": "default",
+      "tags": [
+        {
+          "name": "id",
+          "type": "TAG_TYPE_STRING"
+        },
+        {
+          "name": "service_id",
+          "type": "TAG_TYPE_STRING"
+        },
+        {
+          "name": "name",
+          "type": "TAG_TYPE_STRING"
+        },
+        {
+          "name": "short_name",
+          "type": "TAG_TYPE_STRING"
+        },
+        {
+          "name": "service_group",
+          "type": "TAG_TYPE_STRING"
+        },
+        {
+          "name": "layer",
+          "type": "TAG_TYPE_INT"
+        }
+      ]
+    }
+  ],
+  "entity": {
+    "tag_names": [
+      "id"
+    ]
+  },
+  "index_mode": true,
+  "updated_at": "2021-04-15T01:30:15.01Z"
+}
diff --git a/test/cases/init.go b/test/cases/init.go
index 71f3f520..677e994b 100644
--- a/test/cases/init.go
+++ b/test/cases/init.go
@@ -83,6 +83,7 @@ func Initialize(addr string, now time.Time) {
        interval = time.Minute
        casesmeasuredata.Write(conn, "service_traffic", "index_mode", 
"service_traffic_data_old.json", now.AddDate(0, 0, -2), interval)
        casesmeasuredata.Write(conn, "service_traffic", "index_mode", 
"service_traffic_data.json", now, interval)
+       casesmeasuredata.Write(conn, "service_traffic", "replicated_group", 
"service_traffic_data.json", now, interval)
        casesmeasuredata.Write(conn, "service_instance_traffic", "sw_metric", 
"service_instance_traffic_data.json", now, interval)
        casesmeasuredata.Write(conn, "service_cpm_minute", "sw_metric", 
"service_cpm_minute_data.json", now, interval)
        casesmeasuredata.Write(conn, "instance_clr_cpu_minute", "sw_metric", 
"instance_clr_cpu_minute_data.json", now, interval)
diff --git a/test/cases/measure/data/input/entity_replicated.ql 
b/test/cases/measure/data/input/entity_replicated.ql
new file mode 100644
index 00000000..6d915944
--- /dev/null
+++ b/test/cases/measure/data/input/entity_replicated.ql
@@ -0,0 +1,21 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+SELECT service_id, layer, name, short_name FROM MEASURE service_traffic IN 
replicated_group
+TIME > '-15m'
+WHERE id = '1'
diff --git a/test/cases/measure/data/input/entity_replicated.yaml 
b/test/cases/measure/data/input/entity_replicated.yaml
new file mode 100644
index 00000000..13454431
--- /dev/null
+++ b/test/cases/measure/data/input/entity_replicated.yaml
@@ -0,0 +1,30 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+name: "service_traffic"
+groups: ["replicated_group"]
+tagProjection:
+  tagFamilies:
+  - name: "default"
+    tags: ["service_id", "layer", "name", "short_name"]
+criteria:
+  condition:
+    name: "id"
+    op: "BINARY_OP_EQ"
+    value:
+      str:
+        value: "1"
diff --git a/test/cases/measure/data/want/entity_replicated.yaml 
b/test/cases/measure/data/want/entity_replicated.yaml
new file mode 100644
index 00000000..4fc05dc5
--- /dev/null
+++ b/test/cases/measure/data/want/entity_replicated.yaml
@@ -0,0 +1,37 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+dataPoints:
+- tagFamilies:
+  - name: default
+    tags:
+    - key: service_id
+      value:
+        str:
+          value: service_1
+    - key: layer
+      value:
+        int:
+          value: "1"
+    - key: name
+      value:
+        str:
+          value: service_name_1
+    - key: short_name
+      value:
+        str:
+          value: service_short_name_1
diff --git a/test/integration/replication/replication_suite_test.go 
b/test/integration/replication/replication_suite_test.go
new file mode 100644
index 00000000..b4a88c2e
--- /dev/null
+++ b/test/integration/replication/replication_suite_test.go
@@ -0,0 +1,182 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package replication_test
+
+import (
+       "context"
+       "encoding/json"
+       "fmt"
+       "testing"
+       "time"
+
+       . "github.com/onsi/ginkgo/v2"
+       . "github.com/onsi/gomega"
+       "github.com/onsi/gomega/gleak"
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/credentials/insecure"
+
+       "github.com/apache/skywalking-banyandb/banyand/metadata"
+       "github.com/apache/skywalking-banyandb/banyand/metadata/embeddedetcd"
+       "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+       "github.com/apache/skywalking-banyandb/pkg/grpchelper"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/pool"
+       "github.com/apache/skywalking-banyandb/pkg/test"
+       "github.com/apache/skywalking-banyandb/pkg/test/flags"
+       "github.com/apache/skywalking-banyandb/pkg/test/gmatcher"
+       "github.com/apache/skywalking-banyandb/pkg/test/helpers"
+       test_measure "github.com/apache/skywalking-banyandb/pkg/test/measure"
+       test_property "github.com/apache/skywalking-banyandb/pkg/test/property"
+       "github.com/apache/skywalking-banyandb/pkg/test/setup"
+       test_stream "github.com/apache/skywalking-banyandb/pkg/test/stream"
+       test_trace "github.com/apache/skywalking-banyandb/pkg/test/trace"
+       "github.com/apache/skywalking-banyandb/pkg/timestamp"
+       test_cases "github.com/apache/skywalking-banyandb/test/cases"
+       casesmeasure "github.com/apache/skywalking-banyandb/test/cases/measure"
+)
+
+func TestReplication(t *testing.T) {
+       RegisterFailHandler(Fail)
+       RunSpecs(t, "Replication Suite")
+}
+
+var (
+       deferFunc       func()
+       goods           []gleak.Goroutine
+       now             time.Time
+       connection      *grpc.ClientConn
+       liaisonAddr     string
+       etcdEndpoint    string
+       dataNodeClosers []func()
+)
+
+var _ = SynchronizedBeforeSuite(func() []byte {
+       Expect(logger.Init(logger.Logging{
+               Env:   "dev",
+               Level: flags.LogLevel,
+       })).To(Succeed())
+       pool.EnableStackTracking(true)
+       goods = gleak.Goroutines()
+
+       By("Starting etcd server")
+       ports, err := test.AllocateFreePorts(2)
+       Expect(err).NotTo(HaveOccurred())
+       dir, spaceDef, err := test.NewSpace()
+       Expect(err).NotTo(HaveOccurred())
+       ep := fmt.Sprintf("http://127.0.0.1:%d";, ports[0])
+       etcdEndpoint = ep
+
+       server, err := embeddedetcd.NewServer(
+               embeddedetcd.ConfigureListener([]string{ep}, 
[]string{fmt.Sprintf("http://127.0.0.1:%d";, ports[1])}),
+               embeddedetcd.RootDir(dir),
+               embeddedetcd.AutoCompactionMode("periodic"),
+               embeddedetcd.AutoCompactionRetention("1h"),
+               embeddedetcd.QuotaBackendBytes(2*1024*1024*1024),
+       )
+       Expect(err).ShouldNot(HaveOccurred())
+       <-server.ReadyNotify()
+
+       By("Loading schema")
+       schemaRegistry, err := schema.NewEtcdSchemaRegistry(
+               schema.Namespace(metadata.DefaultNamespace),
+               schema.ConfigureServerEndpoints([]string{ep}),
+       )
+       Expect(err).NotTo(HaveOccurred())
+       defer schemaRegistry.Close()
+
+       ctx := context.Background()
+       // Preload all schemas since test_cases.Initialize needs them
+       test_stream.PreloadSchema(ctx, schemaRegistry)
+       test_measure.PreloadSchema(ctx, schemaRegistry)
+       test_trace.PreloadSchema(ctx, schemaRegistry)
+       test_property.PreloadSchema(ctx, schemaRegistry)
+
+       By("Starting 3 data nodes for replication test")
+       dataNodeClosers = make([]func(), 0, 3)
+
+       for i := 0; i < 3; i++ {
+               closeDataNode := setup.DataNode(ep)
+               dataNodeClosers = append(dataNodeClosers, closeDataNode)
+       }
+
+       By("Starting liaison node")
+       liaisonAddr2, closerLiaisonNode := setup.LiaisonNode(ep)
+       liaisonAddr = liaisonAddr2
+
+       By("Initializing test cases")
+       ns := timestamp.NowMilli().UnixNano()
+       now = time.Unix(0, ns-ns%int64(time.Minute))
+
+       // Initialize test data - this loads data for all test types
+       test_cases.Initialize(liaisonAddr, now)
+
+       deferFunc = func() {
+               closerLiaisonNode()
+               for _, closeDataNode := range dataNodeClosers {
+                       closeDataNode()
+               }
+               _ = server.Close()
+               <-server.StopNotify()
+               spaceDef()
+       }
+
+       suiteConfig := map[string]interface{}{
+               "liaison_addr":  liaisonAddr,
+               "etcd_endpoint": etcdEndpoint,
+               "now":           now.UnixNano(),
+       }
+
+       configBytes, err := json.Marshal(suiteConfig)
+       Expect(err).NotTo(HaveOccurred())
+       return configBytes
+}, func(configBytes []byte) {
+       var config map[string]interface{}
+       err := json.Unmarshal(configBytes, &config)
+       Expect(err).NotTo(HaveOccurred())
+
+       liaisonAddr = config["liaison_addr"].(string)
+       etcdEndpoint = config["etcd_endpoint"].(string)
+       now = time.Unix(0, int64(config["now"].(float64)))
+
+       var err2 error
+       connection, err2 = grpchelper.Conn(liaisonAddr, 10*time.Second,
+               grpc.WithTransportCredentials(insecure.NewCredentials()))
+       Expect(err2).NotTo(HaveOccurred())
+
+       // Only setup context for measure tests since we're only testing 
measure replication
+       casesmeasure.SharedContext = helpers.SharedContext{
+               Connection: connection,
+               BaseTime:   now,
+       }
+})
+
+var _ = SynchronizedAfterSuite(func() {
+       if connection != nil {
+               Expect(connection.Close()).To(Succeed())
+       }
+}, func() {})
+
+var _ = ReportAfterSuite("Replication Suite", func(report Report) {
+       if report.SuiteSucceeded {
+               if deferFunc != nil {
+                       deferFunc()
+               }
+               Eventually(gleak.Goroutines, 
flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods))
+               Eventually(pool.AllRefsCount, 
flags.EventuallyTimeout).Should(gmatcher.HaveZeroRef())
+       }
+})
diff --git a/test/integration/replication/replication_test.go 
b/test/integration/replication/replication_test.go
new file mode 100644
index 00000000..e2c0f2f7
--- /dev/null
+++ b/test/integration/replication/replication_test.go
@@ -0,0 +1,137 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package replication_test
+
+import (
+       "context"
+       "time"
+
+       g "github.com/onsi/ginkgo/v2"
+       gm "github.com/onsi/gomega"
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/credentials/insecure"
+
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       "github.com/apache/skywalking-banyandb/banyand/metadata"
+       "github.com/apache/skywalking-banyandb/pkg/grpchelper"
+       "github.com/apache/skywalking-banyandb/pkg/test/flags"
+       "github.com/apache/skywalking-banyandb/pkg/test/helpers"
+       casesmeasuredata 
"github.com/apache/skywalking-banyandb/test/cases/measure/data"
+)
+
+var _ = g.Describe("Replication", func() {
+       var conn *grpc.ClientConn
+
+       g.BeforeEach(func() {
+               var err error
+               conn, err = grpchelper.Conn(liaisonAddr, 10*time.Second,
+                       
grpc.WithTransportCredentials(insecure.NewCredentials()))
+               gm.Expect(err).NotTo(gm.HaveOccurred())
+       })
+
+       g.AfterEach(func() {
+               if conn != nil {
+                       gm.Expect(conn.Close()).To(gm.Succeed())
+               }
+       })
+
+       g.Context("with replicated_group", func() {
+               g.It("should survive node failure", func() {
+                       g.By("Verifying the measure exists in replicated_group")
+                       ctx := context.Background()
+                       measureMetadata := &commonv1.Metadata{
+                               Name:  "service_traffic",
+                               Group: "replicated_group",
+                       }
+
+                       schemaClient := 
databasev1.NewMeasureRegistryServiceClient(conn)
+                       resp, err := schemaClient.Get(ctx, 
&databasev1.MeasureRegistryServiceGetRequest{Metadata: measureMetadata})
+                       gm.Expect(err).NotTo(gm.HaveOccurred())
+                       gm.Expect(resp.GetMeasure()).NotTo(gm.BeNil())
+                       
gm.Expect(resp.GetMeasure().GetMetadata().GetGroup()).To(gm.Equal("replicated_group"))
+
+                       g.By("Getting list of all nodes from etcd (includes 
data nodes + liaison)")
+                       nodePath := "/" + metadata.DefaultNamespace + "/nodes"
+                       allNodes, err2 := helpers.ListKeys(etcdEndpoint, 
nodePath)
+                       gm.Expect(err2).NotTo(gm.HaveOccurred())
+
+                       // We have: 3 data nodes + 1 liaison node = 4 nodes 
total
+                       gm.Expect(len(allNodes)).To(gm.Equal(4),
+                               "Should have 4 nodes total (3 data nodes + 1 
liaison node), found %d", len(allNodes))
+
+                       g.By("Stopping one data node")
+                       // We should have 3 data node closers in dataNodeClosers
+                       // Stop the first one
+                       // Create a local copy to avoid mutating the 
package-level slice
+                       closersToStop := make([]func(), len(dataNodeClosers))
+                       copy(closersToStop, dataNodeClosers)
+                       closersToStop[0]()
+
+                       // Wait for the cluster to stabilize
+                       gm.Eventually(func() int {
+                               nodes, err3 := helpers.ListKeys(etcdEndpoint, 
nodePath)
+                               if err3 != nil {
+                                       return 0
+                               }
+                               return len(nodes)
+                       }, flags.EventuallyTimeout).Should(gm.Equal(3),
+                               "Should have 3 nodes total after stopping one 
data node (2 data nodes + 1 liaison)")
+
+                       g.By("Verifying data is still accessible after node 
failure")
+                       verifyDataContentAfterNodeFailure(conn, now)
+
+                       g.By("Verifying replication factor")
+                       groupClient := 
databasev1.NewGroupRegistryServiceClient(conn)
+                       groupResp, err := groupClient.Get(ctx, 
&databasev1.GroupRegistryServiceGetRequest{
+                               Group: "replicated_group",
+                       })
+                       gm.Expect(err).NotTo(gm.HaveOccurred())
+                       gm.Expect(groupResp.GetGroup()).NotTo(gm.BeNil())
+                       
gm.Expect(groupResp.GetGroup().GetResourceOpts().GetReplicas()).To(gm.Equal(uint32(2)),
+                               "replicated_group should have replicas=2")
+               })
+       })
+})
+
+func verifyDataContentAfterNodeFailure(conn *grpc.ClientConn, baseTime 
time.Time) {
+       // This verifies that data is still accessible AFTER a node has failed
+
+       // Create a SharedContext like in measure.go
+       sharedContext := helpers.SharedContext{
+               Connection: conn,
+               BaseTime:   baseTime,
+       }
+
+       // Create args for the entity_replicated test case
+       args := helpers.Args{
+               Input:    "entity_replicated",
+               Duration: 25 * time.Minute,
+               Offset:   -20 * time.Minute,
+               DisOrder: true,
+       }
+
+       // This will:
+       // 1. Read entity_replicated.ql and entity_replicated.yaml
+       // 2. Execute the query
+       // 3. Verify results match expected data
+       gm.Eventually(func(innerGm gm.Gomega) {
+               casesmeasuredata.VerifyFn(innerGm, sharedContext, args)
+       }, flags.EventuallyTimeout).Should(gm.Succeed(),
+               "Should be able to query and verify data content after node 
failure")
+}

Reply via email to