This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new bced921 Introduce preparation workflow for tests (#63)
bced921 is described below
commit bced92119340fb6dbe882bbedc67809aad4459cd
Author: Jiajing LU <[email protected]>
AuthorDate: Thu Jan 6 14:29:33 2022 +0800
Introduce preparation workflow for tests (#63)
* add setup workflow
* add setup tests
---
banyand/metadata/schema/etcd.go | 6 +-
banyand/stream/stream_write_test.go | 89 ++++++++++++++++---------
go.mod | 2 +-
go.sum | 3 +-
pkg/test/setup.go | 128 ++++++++++++++++++++++++++++++++++++
pkg/test/setup_test.go | 107 ++++++++++++++++++++++++++++++
pkg/test/stream/etcd.go | 7 --
7 files changed, 301 insertions(+), 41 deletions(-)
diff --git a/banyand/metadata/schema/etcd.go b/banyand/metadata/schema/etcd.go
index 9e52b6c..eb145e2 100644
--- a/banyand/metadata/schema/etcd.go
+++ b/banyand/metadata/schema/etcd.go
@@ -46,6 +46,8 @@ var (
ErrEntityNotFound = errors.New("entity is not found")
ErrUnexpectedNumberOfEntities = errors.New("unexpected number of
entities")
+ unixDomainSockScheme = "unix"
+
GroupsKeyPrefix = "/groups/"
GroupMetadataKey = "/__meta_group__"
StreamKeyPrefix = "/streams/"
@@ -64,8 +66,8 @@ func RootDir(rootDir string) RegistryOption {
func randomUnixDomainListener() (string, string) {
i := rand.Uint64()
- return fmt.Sprintf("%s://localhost:%d%06d", "unix", os.Getpid(), i),
- fmt.Sprintf("%s://localhost:%d%06d", "unix", os.Getpid(), i+1)
+ return fmt.Sprintf("%s://localhost:%d%06d", unixDomainSockScheme,
os.Getpid(), i),
+ fmt.Sprintf("%s://localhost:%d%06d", unixDomainSockScheme,
os.Getpid(), i+1)
}
func UseRandomListener() RegistryOption {
diff --git a/banyand/stream/stream_write_test.go
b/banyand/stream/stream_write_test.go
index e4a58fa..689e1df 100644
--- a/banyand/stream/stream_write_test.go
+++ b/banyand/stream/stream_write_test.go
@@ -28,6 +28,7 @@ import (
"google.golang.org/protobuf/types/known/timestamppb"
commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+ databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
streamv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
"github.com/apache/skywalking-banyandb/banyand/metadata"
@@ -184,7 +185,8 @@ func Test_Stream_Write(t *testing.T) {
}
-func setup(t *testing.T) (*stream, func()) {
+func setup(t *testing.T) (*stream, test.StopFunc) {
+ flow := test.NewTestFlow()
req := require.New(t)
req.NoError(logger.Init(logger.Logging{
Env: "dev",
@@ -192,38 +194,65 @@ func setup(t *testing.T) (*stream, func()) {
}))
tempDir, deferFunc := test.Space(req)
- mService, err := metadata.NewService(context.TODO())
- req.NoError(err)
+ var mService metadata.Service
+ var etcdRootDir string
+ var defaultStream *databasev1.Stream
+ var rules []*databasev1.IndexRule
+ var s *stream
- etcdRootDir := teststream.RandomTempDir()
- err = mService.FlagSet().Parse([]string{"--metadata-root-path=" +
etcdRootDir})
- req.NoError(err)
-
- err = mService.PreRun()
- req.NoError(err)
+ flow.
+ PushErrorHandler(func() {
+ deferFunc()
+ }).
+ Run(context.TODO(), func() (err error) {
+ mService, err = metadata.NewService(context.TODO())
+ return
+ }, func() {
+ if mService != nil {
+ mService.GracefulStop()
+ }
+ }).
+ RunWithoutSideEffect(context.TODO(), func() error {
+ etcdRootDir = teststream.RandomTempDir()
+ return
mService.FlagSet().Parse([]string{"--metadata-root-path=" + etcdRootDir})
+ }).
+ Run(context.TODO(), func() error {
+ return mService.PreRun()
+ }, func() {
+ if len(etcdRootDir) > 0 {
+ _ = os.RemoveAll(etcdRootDir)
+ }
+ }).
+ RunWithoutSideEffect(context.TODO(), func() error {
+ return
teststream.PreloadSchema(mService.SchemaRegistry())
+ }).
+ RunWithoutSideEffect(context.TODO(), func() (err error) {
+ defaultStream, err =
mService.StreamRegistry().GetStream(context.TODO(), &commonv1.Metadata{
+ Name: "sw",
+ Group: "default",
+ })
+ return
+ }).
+ RunWithoutSideEffect(context.TODO(), func() (err error) {
+ rules, err = mService.IndexRules(context.TODO(),
defaultStream.GetMetadata())
+ return
+ }).
+ Run(context.TODO(), func() (err error) {
+ sSpec := streamSpec{
+ schema: defaultStream,
+ indexRules: rules,
+ }
+ s, err = openStream(tempDir, sSpec,
logger.GetLogger("test"))
+ return
+ }, func() {
+ if s != nil {
+ _ = s.Close()
+ }
+ })
- err = teststream.PreloadSchema(mService.SchemaRegistry())
- req.NoError(err)
+ req.NoError(flow.Error())
- sa, err := mService.StreamRegistry().GetStream(context.TODO(),
&commonv1.Metadata{
- Name: "sw",
- Group: "default",
- })
- req.NoError(err)
- iRules, err := mService.IndexRules(context.TODO(), sa.Metadata)
- req.NoError(err)
- sSpec := streamSpec{
- schema: sa,
- indexRules: iRules,
- }
- s, err := openStream(tempDir, sSpec, logger.GetLogger("test"))
- req.NoError(err)
- return s, func() {
- _ = s.Close()
- mService.GracefulStop()
- deferFunc()
- _ = os.RemoveAll(etcdRootDir)
- }
+ return s, flow.Shutdown()
}
func getEle(tags ...interface{}) *streamv1.ElementValue {
diff --git a/go.mod b/go.mod
index b2815cb..adda1b0 100644
--- a/go.mod
+++ b/go.mod
@@ -89,7 +89,7 @@ require (
go.opentelemetry.io/otel/sdk/metric v0.20.0 // indirect
go.opentelemetry.io/otel/trace v0.20.0 // indirect
go.opentelemetry.io/proto/otlp v0.7.0 // indirect
- go.uber.org/atomic v1.7.0 // indirect
+ go.uber.org/atomic v1.9.0 // indirect
go.uber.org/zap v1.17.0 // indirect
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0 // indirect
golang.org/x/text v0.3.6 // indirect
diff --git a/go.sum b/go.sum
index afb3bff..5b62477 100644
--- a/go.sum
+++ b/go.sum
@@ -482,8 +482,9 @@ go.opentelemetry.io/otel/trace v0.20.0/go.mod
h1:6GjCW8zgDjwGHGa6GkyeB8+/5vjT16g
go.opentelemetry.io/proto/otlp v0.7.0
h1:rwOQPCuKAKmwGKq2aVNnYIibI6wnV7EvzgfTCzcdGg8=
go.opentelemetry.io/proto/otlp v0.7.0/go.mod
h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
go.uber.org/atomic v1.4.0/go.mod
h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
-go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
go.uber.org/atomic v1.7.0/go.mod
h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
+go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE=
+go.uber.org/atomic v1.9.0/go.mod
h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/goleak v1.1.10 h1:z+mqJhf6ss6BSfSM671tgKyZBFPTTJM+HLxnhPC3wu0=
go.uber.org/goleak v1.1.10/go.mod
h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
go.uber.org/multierr v1.1.0/go.mod
h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
diff --git a/pkg/test/setup.go b/pkg/test/setup.go
new file mode 100644
index 0000000..978d474
--- /dev/null
+++ b/pkg/test/setup.go
@@ -0,0 +1,128 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package test
+
+import (
+ "context"
+
+ "github.com/pkg/errors"
+ "go.uber.org/multierr"
+)
+
+type StartFunc func() error
+type StopFunc func()
+
+type Flow interface {
+ // PushErrorHandler only pushes a stopFunc
+ PushErrorHandler(StopFunc) Flow
+
+ // Run calls the startFunc and expects no error returned.
+ // If a non-nil error is returned or panic, shutdown must be called at
once.
+ // The error will be stored and thus could be checked by the caller
later.
+ Run(context.Context, StartFunc, StopFunc) Flow
+
+ // RunWithoutSideEffect calls the startFunc and does not expect any
side effects from it.
+ // If a non-nil error is returned or panic, shutdown must be called at
once.
+ // The error will be stored and thus could be checked by the caller
later.
+ RunWithoutSideEffect(context.Context, StartFunc) Flow
+
+ // Shutdown does not actually shutdown the flow,
+ // but only returns a function containing all stopFunc(s) to be
executed.
+ // The timing to do the real shutdown can be determined by users.
+ Shutdown() StopFunc
+
+ // Error returns all errors returned from startFunc(s)
+ // Nil error imply a successful flow.
+ Error() error
+}
+
+type testFlow struct {
+ err error
+ stopFuncs []StopFunc
+}
+
+// NewTestFlow creates a flow ready to prepare services/components to be used
for testing.
+func NewTestFlow() Flow {
+ return &testFlow{
+ stopFuncs: make([]StopFunc, 0),
+ }
+}
+
+// Shutdown does not actually call the shutdown functions but only return a
composition of all
+// stop functions given by the user thus it allows the user to determine the
timing.
+func (tf *testFlow) Shutdown() StopFunc {
+ return func() {
+ for idx := len(tf.stopFuncs) - 1; idx >= 0; idx-- {
+ tf.stopFuncs[idx]()
+ }
+ }
+}
+
+func (tf *testFlow) RunWithoutSideEffect(ctx context.Context, startFunc
StartFunc) Flow {
+ return tf.Run(ctx, startFunc, nil)
+}
+
+func (tf *testFlow) Run(ctx context.Context, startFunc StartFunc, stopFunc
StopFunc) Flow {
+ if tf.err != nil {
+ return tf
+ }
+
+ if stopFunc != nil {
+ tf.stopFuncs = append(tf.stopFuncs, stopFunc)
+ }
+
+ errCh := make(chan error)
+ defer func() {
+ close(errCh)
+ }()
+
+ donec := make(chan struct{})
+ // start a new goroutine in order to recover from panic
+ go func() {
+ defer func() {
+ if r := recover(); r != nil {
+ errCh <- errors.Errorf("panic found %v", r)
+ close(donec)
+ }
+ }()
+ err := startFunc()
+
+ if err != nil {
+ errCh <- err
+ }
+ close(donec)
+ }()
+ select {
+ case <-donec:
+ case err := <-errCh:
+ tf.err = multierr.Append(tf.err, err)
+ tf.Shutdown()()
+ return tf
+ }
+
+ return tf
+}
+
+func (tf *testFlow) PushErrorHandler(stopFunc StopFunc) Flow {
+ tf.stopFuncs = append(tf.stopFuncs, stopFunc)
+ return tf
+}
+
+func (tf *testFlow) Error() error {
+ return tf.err
+}
diff --git a/pkg/test/setup_test.go b/pkg/test/setup_test.go
new file mode 100644
index 0000000..c66155c
--- /dev/null
+++ b/pkg/test/setup_test.go
@@ -0,0 +1,107 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package test
+
+import (
+ "context"
+ "errors"
+ "sync"
+ "testing"
+
+ "github.com/stretchr/testify/require"
+)
+
+func Test_Setup_Single_Error(t *testing.T) {
+ require := require.New(t)
+ wg := sync.WaitGroup{}
+ flow := NewTestFlow().
+ Run(context.TODO(), func() error {
+ wg.Add(1)
+ return errors.New("normal error")
+ }, func() {
+ wg.Done()
+ })
+
+ wg.Wait()
+
+ require.Error(flow.Error())
+}
+
+func Test_Setup_Multiple_ErrorHandlers(t *testing.T) {
+ require := require.New(t)
+ wg := sync.WaitGroup{}
+ flow := NewTestFlow().
+ Run(context.TODO(), func() error {
+ wg.Add(1)
+ return nil
+ }, func() {
+ wg.Done()
+ }).
+ Run(context.TODO(), func() error {
+ wg.Add(1)
+ return errors.New("normal error")
+ }, func() {
+ wg.Done()
+ })
+
+ wg.Wait()
+
+ require.Error(flow.Error())
+}
+
+func Test_Setup_Panic(t *testing.T) {
+ require := require.New(t)
+ wg := sync.WaitGroup{}
+ flow := NewTestFlow().
+ Run(context.TODO(), func() error {
+ wg.Add(1)
+ panic("oops...")
+ }, func() {
+ wg.Done()
+ })
+
+ wg.Wait()
+
+ require.Error(flow.Error())
+}
+
+func Test_Setup_Shutdown(t *testing.T) {
+ require := require.New(t)
+ wg := sync.WaitGroup{}
+ flow := NewTestFlow().
+ Run(context.TODO(), func() error {
+ wg.Add(1)
+ return nil
+ }, func() {
+ wg.Done()
+ }).
+ Run(context.TODO(), func() error {
+ wg.Add(1)
+ return nil
+ }, func() {
+ wg.Done()
+ })
+
+ go func() {
+ flow.Shutdown()()
+ }()
+
+ wg.Wait()
+
+ require.NoError(flow.Error())
+}
diff --git a/pkg/test/stream/etcd.go b/pkg/test/stream/etcd.go
index 2f2c89f..442bb2a 100644
--- a/pkg/test/stream/etcd.go
+++ b/pkg/test/stream/etcd.go
@@ -21,7 +21,6 @@ import (
"context"
"embed"
"fmt"
- "math/rand"
"os"
"path"
@@ -88,9 +87,3 @@ func PreloadSchema(e schema.Registry) error {
func RandomTempDir() string {
return path.Join(os.TempDir(), fmt.Sprintf("banyandb-embed-etcd-%s",
uuid.New().String()))
}
-
-func RandomUnixDomainListener() (string, string) {
- i := rand.Uint64()
- return fmt.Sprintf("%s://localhost:%d%06d", "unix", os.Getpid(), i),
- fmt.Sprintf("%s://localhost:%d%06d", "unix", os.Getpid(), i+1)
-}