This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new bced921  Introduce preparation workflow for tests (#63)
bced921 is described below

commit bced92119340fb6dbe882bbedc67809aad4459cd
Author: Jiajing LU <[email protected]>
AuthorDate: Thu Jan 6 14:29:33 2022 +0800

    Introduce preparation workflow for tests (#63)
    
    * add setup workflow
    
    * add setup tests
---
 banyand/metadata/schema/etcd.go     |   6 +-
 banyand/stream/stream_write_test.go |  89 ++++++++++++++++---------
 go.mod                              |   2 +-
 go.sum                              |   3 +-
 pkg/test/setup.go                   | 128 ++++++++++++++++++++++++++++++++++++
 pkg/test/setup_test.go              | 107 ++++++++++++++++++++++++++++++
 pkg/test/stream/etcd.go             |   7 --
 7 files changed, 301 insertions(+), 41 deletions(-)

diff --git a/banyand/metadata/schema/etcd.go b/banyand/metadata/schema/etcd.go
index 9e52b6c..eb145e2 100644
--- a/banyand/metadata/schema/etcd.go
+++ b/banyand/metadata/schema/etcd.go
@@ -46,6 +46,8 @@ var (
        ErrEntityNotFound             = errors.New("entity is not found")
        ErrUnexpectedNumberOfEntities = errors.New("unexpected number of 
entities")
 
+       unixDomainSockScheme = "unix"
+
        GroupsKeyPrefix           = "/groups/"
        GroupMetadataKey          = "/__meta_group__"
        StreamKeyPrefix           = "/streams/"
@@ -64,8 +66,8 @@ func RootDir(rootDir string) RegistryOption {
 
 func randomUnixDomainListener() (string, string) {
        i := rand.Uint64()
-       return fmt.Sprintf("%s://localhost:%d%06d", "unix", os.Getpid(), i),
-               fmt.Sprintf("%s://localhost:%d%06d", "unix", os.Getpid(), i+1)
+       return fmt.Sprintf("%s://localhost:%d%06d", unixDomainSockScheme, 
os.Getpid(), i),
+               fmt.Sprintf("%s://localhost:%d%06d", unixDomainSockScheme, 
os.Getpid(), i+1)
 }
 
 func UseRandomListener() RegistryOption {
diff --git a/banyand/stream/stream_write_test.go 
b/banyand/stream/stream_write_test.go
index e4a58fa..689e1df 100644
--- a/banyand/stream/stream_write_test.go
+++ b/banyand/stream/stream_write_test.go
@@ -28,6 +28,7 @@ import (
        "google.golang.org/protobuf/types/known/timestamppb"
 
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        streamv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
        "github.com/apache/skywalking-banyandb/banyand/metadata"
@@ -184,7 +185,8 @@ func Test_Stream_Write(t *testing.T) {
 
 }
 
-func setup(t *testing.T) (*stream, func()) {
+func setup(t *testing.T) (*stream, test.StopFunc) {
+       flow := test.NewTestFlow()
        req := require.New(t)
        req.NoError(logger.Init(logger.Logging{
                Env:   "dev",
@@ -192,38 +194,65 @@ func setup(t *testing.T) (*stream, func()) {
        }))
        tempDir, deferFunc := test.Space(req)
 
-       mService, err := metadata.NewService(context.TODO())
-       req.NoError(err)
+       var mService metadata.Service
+       var etcdRootDir string
+       var defaultStream *databasev1.Stream
+       var rules []*databasev1.IndexRule
+       var s *stream
 
-       etcdRootDir := teststream.RandomTempDir()
-       err = mService.FlagSet().Parse([]string{"--metadata-root-path=" + 
etcdRootDir})
-       req.NoError(err)
-
-       err = mService.PreRun()
-       req.NoError(err)
+       flow.
+               PushErrorHandler(func() {
+                       deferFunc()
+               }).
+               Run(context.TODO(), func() (err error) {
+                       mService, err = metadata.NewService(context.TODO())
+                       return
+               }, func() {
+                       if mService != nil {
+                               mService.GracefulStop()
+                       }
+               }).
+               RunWithoutSideEffect(context.TODO(), func() error {
+                       etcdRootDir = teststream.RandomTempDir()
+                       return 
mService.FlagSet().Parse([]string{"--metadata-root-path=" + etcdRootDir})
+               }).
+               Run(context.TODO(), func() error {
+                       return mService.PreRun()
+               }, func() {
+                       if len(etcdRootDir) > 0 {
+                               _ = os.RemoveAll(etcdRootDir)
+                       }
+               }).
+               RunWithoutSideEffect(context.TODO(), func() error {
+                       return 
teststream.PreloadSchema(mService.SchemaRegistry())
+               }).
+               RunWithoutSideEffect(context.TODO(), func() (err error) {
+                       defaultStream, err = 
mService.StreamRegistry().GetStream(context.TODO(), &commonv1.Metadata{
+                               Name:  "sw",
+                               Group: "default",
+                       })
+                       return
+               }).
+               RunWithoutSideEffect(context.TODO(), func() (err error) {
+                       rules, err = mService.IndexRules(context.TODO(), 
defaultStream.GetMetadata())
+                       return
+               }).
+               Run(context.TODO(), func() (err error) {
+                       sSpec := streamSpec{
+                               schema:     defaultStream,
+                               indexRules: rules,
+                       }
+                       s, err = openStream(tempDir, sSpec, 
logger.GetLogger("test"))
+                       return
+               }, func() {
+                       if s != nil {
+                               _ = s.Close()
+                       }
+               })
 
-       err = teststream.PreloadSchema(mService.SchemaRegistry())
-       req.NoError(err)
+       req.NoError(flow.Error())
 
-       sa, err := mService.StreamRegistry().GetStream(context.TODO(), 
&commonv1.Metadata{
-               Name:  "sw",
-               Group: "default",
-       })
-       req.NoError(err)
-       iRules, err := mService.IndexRules(context.TODO(), sa.Metadata)
-       req.NoError(err)
-       sSpec := streamSpec{
-               schema:     sa,
-               indexRules: iRules,
-       }
-       s, err := openStream(tempDir, sSpec, logger.GetLogger("test"))
-       req.NoError(err)
-       return s, func() {
-               _ = s.Close()
-               mService.GracefulStop()
-               deferFunc()
-               _ = os.RemoveAll(etcdRootDir)
-       }
+       return s, flow.Shutdown()
 }
 
 func getEle(tags ...interface{}) *streamv1.ElementValue {
diff --git a/go.mod b/go.mod
index b2815cb..adda1b0 100644
--- a/go.mod
+++ b/go.mod
@@ -89,7 +89,7 @@ require (
        go.opentelemetry.io/otel/sdk/metric v0.20.0 // indirect
        go.opentelemetry.io/otel/trace v0.20.0 // indirect
        go.opentelemetry.io/proto/otlp v0.7.0 // indirect
-       go.uber.org/atomic v1.7.0 // indirect
+       go.uber.org/atomic v1.9.0 // indirect
        go.uber.org/zap v1.17.0 // indirect
        golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0 // indirect
        golang.org/x/text v0.3.6 // indirect
diff --git a/go.sum b/go.sum
index afb3bff..5b62477 100644
--- a/go.sum
+++ b/go.sum
@@ -482,8 +482,9 @@ go.opentelemetry.io/otel/trace v0.20.0/go.mod 
h1:6GjCW8zgDjwGHGa6GkyeB8+/5vjT16g
 go.opentelemetry.io/proto/otlp v0.7.0 
h1:rwOQPCuKAKmwGKq2aVNnYIibI6wnV7EvzgfTCzcdGg8=
 go.opentelemetry.io/proto/otlp v0.7.0/go.mod 
h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
 go.uber.org/atomic v1.4.0/go.mod 
h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
-go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
 go.uber.org/atomic v1.7.0/go.mod 
h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
+go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE=
+go.uber.org/atomic v1.9.0/go.mod 
h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
 go.uber.org/goleak v1.1.10 h1:z+mqJhf6ss6BSfSM671tgKyZBFPTTJM+HLxnhPC3wu0=
 go.uber.org/goleak v1.1.10/go.mod 
h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
 go.uber.org/multierr v1.1.0/go.mod 
h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
diff --git a/pkg/test/setup.go b/pkg/test/setup.go
new file mode 100644
index 0000000..978d474
--- /dev/null
+++ b/pkg/test/setup.go
@@ -0,0 +1,128 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package test
+
+import (
+       "context"
+
+       "github.com/pkg/errors"
+       "go.uber.org/multierr"
+)
+
+type StartFunc func() error
+type StopFunc func()
+
+type Flow interface {
+       // PushErrorHandler only pushes a stopFunc
+       PushErrorHandler(StopFunc) Flow
+
+       // Run calls the startFunc and expects no error returned.
+       // If a non-nil error is returned or panic, shutdown must be called at 
once.
+       // The error will be stored and thus could be checked by the caller 
later.
+       Run(context.Context, StartFunc, StopFunc) Flow
+
+       // RunWithoutSideEffect calls the startFunc and does not expect any 
side effects from it.
+       // If a non-nil error is returned or panic, shutdown must be called at 
once.
+       // The error will be stored and thus could be checked by the caller 
later.
+       RunWithoutSideEffect(context.Context, StartFunc) Flow
+
+       // Shutdown does not actually shutdown the flow,
+       // but only returns a function containing all stopFunc(s) to be 
executed.
+       // The timing to do the real shutdown can be determined by users.
+       Shutdown() StopFunc
+
+       // Error returns all errors returned from startFunc(s)
+       // Nil error imply a successful flow.
+       Error() error
+}
+
+type testFlow struct {
+       err       error
+       stopFuncs []StopFunc
+}
+
+// NewTestFlow creates a flow ready to prepare services/components to be used 
for testing.
+func NewTestFlow() Flow {
+       return &testFlow{
+               stopFuncs: make([]StopFunc, 0),
+       }
+}
+
+// Shutdown does not actually call the shutdown functions but only return a 
composition of all
+// stop functions given by the user thus it allows the user to determine the 
timing.
+func (tf *testFlow) Shutdown() StopFunc {
+       return func() {
+               for idx := len(tf.stopFuncs) - 1; idx >= 0; idx-- {
+                       tf.stopFuncs[idx]()
+               }
+       }
+}
+
+func (tf *testFlow) RunWithoutSideEffect(ctx context.Context, startFunc 
StartFunc) Flow {
+       return tf.Run(ctx, startFunc, nil)
+}
+
+func (tf *testFlow) Run(ctx context.Context, startFunc StartFunc, stopFunc 
StopFunc) Flow {
+       if tf.err != nil {
+               return tf
+       }
+
+       if stopFunc != nil {
+               tf.stopFuncs = append(tf.stopFuncs, stopFunc)
+       }
+
+       errCh := make(chan error)
+       defer func() {
+               close(errCh)
+       }()
+
+       donec := make(chan struct{})
+       // start a new goroutine in order to recover from panic
+       go func() {
+               defer func() {
+                       if r := recover(); r != nil {
+                               errCh <- errors.Errorf("panic found %v", r)
+                               close(donec)
+                       }
+               }()
+               err := startFunc()
+
+               if err != nil {
+                       errCh <- err
+               }
+               close(donec)
+       }()
+       select {
+       case <-donec:
+       case err := <-errCh:
+               tf.err = multierr.Append(tf.err, err)
+               tf.Shutdown()()
+               return tf
+       }
+
+       return tf
+}
+
+func (tf *testFlow) PushErrorHandler(stopFunc StopFunc) Flow {
+       tf.stopFuncs = append(tf.stopFuncs, stopFunc)
+       return tf
+}
+
+func (tf *testFlow) Error() error {
+       return tf.err
+}
diff --git a/pkg/test/setup_test.go b/pkg/test/setup_test.go
new file mode 100644
index 0000000..c66155c
--- /dev/null
+++ b/pkg/test/setup_test.go
@@ -0,0 +1,107 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package test
+
+import (
+       "context"
+       "errors"
+       "sync"
+       "testing"
+
+       "github.com/stretchr/testify/require"
+)
+
+func Test_Setup_Single_Error(t *testing.T) {
+       require := require.New(t)
+       wg := sync.WaitGroup{}
+       flow := NewTestFlow().
+               Run(context.TODO(), func() error {
+                       wg.Add(1)
+                       return errors.New("normal error")
+               }, func() {
+                       wg.Done()
+               })
+
+       wg.Wait()
+
+       require.Error(flow.Error())
+}
+
+func Test_Setup_Multiple_ErrorHandlers(t *testing.T) {
+       require := require.New(t)
+       wg := sync.WaitGroup{}
+       flow := NewTestFlow().
+               Run(context.TODO(), func() error {
+                       wg.Add(1)
+                       return nil
+               }, func() {
+                       wg.Done()
+               }).
+               Run(context.TODO(), func() error {
+                       wg.Add(1)
+                       return errors.New("normal error")
+               }, func() {
+                       wg.Done()
+               })
+
+       wg.Wait()
+
+       require.Error(flow.Error())
+}
+
+func Test_Setup_Panic(t *testing.T) {
+       require := require.New(t)
+       wg := sync.WaitGroup{}
+       flow := NewTestFlow().
+               Run(context.TODO(), func() error {
+                       wg.Add(1)
+                       panic("oops...")
+               }, func() {
+                       wg.Done()
+               })
+
+       wg.Wait()
+
+       require.Error(flow.Error())
+}
+
+func Test_Setup_Shutdown(t *testing.T) {
+       require := require.New(t)
+       wg := sync.WaitGroup{}
+       flow := NewTestFlow().
+               Run(context.TODO(), func() error {
+                       wg.Add(1)
+                       return nil
+               }, func() {
+                       wg.Done()
+               }).
+               Run(context.TODO(), func() error {
+                       wg.Add(1)
+                       return nil
+               }, func() {
+                       wg.Done()
+               })
+
+       go func() {
+               flow.Shutdown()()
+       }()
+
+       wg.Wait()
+
+       require.NoError(flow.Error())
+}
diff --git a/pkg/test/stream/etcd.go b/pkg/test/stream/etcd.go
index 2f2c89f..442bb2a 100644
--- a/pkg/test/stream/etcd.go
+++ b/pkg/test/stream/etcd.go
@@ -21,7 +21,6 @@ import (
        "context"
        "embed"
        "fmt"
-       "math/rand"
        "os"
        "path"
 
@@ -88,9 +87,3 @@ func PreloadSchema(e schema.Registry) error {
 func RandomTempDir() string {
        return path.Join(os.TempDir(), fmt.Sprintf("banyandb-embed-etcd-%s", 
uuid.New().String()))
 }
-
-func RandomUnixDomainListener() (string, string) {
-       i := rand.Uint64()
-       return fmt.Sprintf("%s://localhost:%d%06d", "unix", os.Getpid(), i),
-               fmt.Sprintf("%s://localhost:%d%06d", "unix", os.Getpid(), i+1)
-}

Reply via email to