This is an automated email from the ASF dual-hosted git repository.

zenlin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git


The following commit(s) were added to refs/heads/master by this push:
     new dc21fb4  Supports synchronization in a single direction
dc21fb4 is described below

commit dc21fb456d8fad558e60b41036336859bf4b9403
Author: chinxââ <[email protected]>
AuthorDate: Tue Mar 24 16:40:42 2020 +0800

    Supports synchronization in a single direction
---
 syncer/pkg/utils/atomic_bool.go      |  55 +++++++++++++++++++
 syncer/pkg/utils/atomic_bool_test.go |  44 +++++++++++++++
 syncer/server/convert.go             |  13 ++---
 syncer/server/server.go              |  27 ++++++----
 syncer/task/idle/idle.go             |  47 ++++++++++++++++
 syncer/task/idle/idle_test.go        |  35 ++++++++++++
 syncer/task/task.go                  |  70 ++++++++++++++++++++++++
 syncer/task/task_test.go             |  51 ++++++++++++++++++
 syncer/task/ticker/ticker.go         | 101 +++++++++++++++++++++++++++++++++++
 syncer/task/ticker/ticker_test.go    |  60 +++++++++++++++++++++
 10 files changed, 486 insertions(+), 17 deletions(-)

diff --git a/syncer/pkg/utils/atomic_bool.go b/syncer/pkg/utils/atomic_bool.go
new file mode 100644
index 0000000..4336033
--- /dev/null
+++ b/syncer/pkg/utils/atomic_bool.go
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package utils
+
+import (
+       "sync"
+       "sync/atomic"
+)
+
+// AtomicBool struct
+type AtomicBool struct {
+       m      sync.Mutex
+       status uint32
+}
+
+// NewAtomicBool returns an atomic bool
+func NewAtomicBool(b bool) *AtomicBool {
+       var status uint32
+       if b {
+               status = 1
+       }
+       return &AtomicBool{status: status}
+}
+
+// Bool returns a bool value
+func (a *AtomicBool) Bool() bool {
+       return atomic.LoadUint32(&a.status)&1 == 1
+}
+
+// DoToReverse Do something and reverse the status
+func (a *AtomicBool) DoToReverse(when bool, fn func()) {
+       if a.Bool() != when {
+               return
+       }
+
+       a.m.Lock()
+       fn()
+       atomic.StoreUint32(&a.status, a.status^1)
+       a.m.Unlock()
+}
diff --git a/syncer/pkg/utils/atomic_bool_test.go 
b/syncer/pkg/utils/atomic_bool_test.go
new file mode 100644
index 0000000..e4455aa
--- /dev/null
+++ b/syncer/pkg/utils/atomic_bool_test.go
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package utils
+
+import (
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+)
+
+func TestAtomicBool(t *testing.T) {
+       b := NewAtomicBool(false)
+       assert.False(t, b.Bool())
+
+       b.DoToReverse(false, func() {})
+       assert.True(t, b.Bool())
+
+       b.DoToReverse(true, func() {})
+       assert.False(t, b.Bool())
+
+       nb := NewAtomicBool(true)
+       assert.True(t, nb.Bool())
+
+       nb.DoToReverse(false, func() {})
+       assert.True(t, nb.Bool())
+
+       nb.DoToReverse(true, func() {})
+       assert.False(t, nb.Bool())
+}
diff --git a/syncer/server/convert.go b/syncer/server/convert.go
index 1f56762..45e3024 100644
--- a/syncer/server/convert.go
+++ b/syncer/server/convert.go
@@ -29,6 +29,7 @@ import (
        "github.com/apache/servicecomb-service-center/syncer/pkg/utils"
        "github.com/apache/servicecomb-service-center/syncer/plugins"
        "github.com/apache/servicecomb-service-center/syncer/serf"
+       "github.com/apache/servicecomb-service-center/syncer/task"
 )
 
 func convertSerfConfig(c *config.Config) *serf.Config {
@@ -64,16 +65,12 @@ func convertEtcdConfig(c *config.Config) *etcd.Config {
        return conf
 }
 
-func convertTickerInterval(c *config.Config) int {
-       strNum := ""
+func convertTaskOptions(c *config.Config) []task.Option {
+       opts := make([]task.Option, 0, len(c.Task.Params))
        for _, label := range c.Task.Params {
-               if label.Key == "interval" {
-                       strNum = label.Value
-                       break
-               }
+               opts = append(opts, task.WithAddKV(label.Key, label.Value))
        }
-       interval, _ := time.ParseDuration(strNum)
-       return int(interval.Seconds())
+       return opts
 }
 
 func convertSCConfigOption(c *config.Config) []plugins.SCConfigOption {
diff --git a/syncer/server/server.go b/syncer/server/server.go
index e5eafdf..17a2b60 100644
--- a/syncer/server/server.go
+++ b/syncer/server/server.go
@@ -31,15 +31,19 @@ import (
        "github.com/apache/servicecomb-service-center/syncer/etcd"
        "github.com/apache/servicecomb-service-center/syncer/grpc"
        "github.com/apache/servicecomb-service-center/syncer/pkg/syssig"
-       "github.com/apache/servicecomb-service-center/syncer/pkg/ticker"
        "github.com/apache/servicecomb-service-center/syncer/pkg/utils"
        "github.com/apache/servicecomb-service-center/syncer/plugins"
        "github.com/apache/servicecomb-service-center/syncer/serf"
        "github.com/apache/servicecomb-service-center/syncer/servicecenter"
+       "github.com/apache/servicecomb-service-center/syncer/task"
 
        // import plugins
        _ "github.com/apache/servicecomb-service-center/syncer/plugins/eureka"
        _ 
"github.com/apache/servicecomb-service-center/syncer/plugins/servicecenter"
+
+       // import task
+       _ "github.com/apache/servicecomb-service-center/syncer/task/idle"
+       _ "github.com/apache/servicecomb-service-center/syncer/task/ticker"
 )
 
 var stopChanErr = errors.New("stopped syncer by stopCh")
@@ -61,8 +65,8 @@ type Server struct {
        // Syncer configuration
        conf *config.Config
 
-       // Ticker for Syncer
-       tick *ticker.TaskTicker
+       // task for discovery
+       task task.Tasker
 
        // Wrap the servicecenter
        servicecenter servicecenter.Servicecenter
@@ -126,7 +130,12 @@ func (s *Server) Run(ctx context.Context) {
        s.servicecenter.SetStorageEngine(s.etcd.Storage())
 
        s.agent.RegisterEventHandler(s)
-       gopool.Go(s.tick.Start)
+
+       s.task.Handle(func() {
+               s.tickHandler(ctx)
+       })
+
+       s.task.Run(ctx)
 
        log.Info("start service done")
 
@@ -138,10 +147,6 @@ func (s *Server) Run(ctx context.Context) {
 
 // Stop Syncer Server
 func (s *Server) Stop() {
-       if s.tick != nil {
-               s.tick.Stop()
-       }
-
        if s.agent != nil {
                // removes the serf eventHandler
                s.agent.DeregisterEventHandler(s)
@@ -192,7 +197,11 @@ func (s *Server) initialization() (err error) {
        s.etcdConf = convertEtcdConfig(s.conf)
        s.etcd = etcd.NewAgent(s.etcdConf)
 
-       s.tick = ticker.NewTaskTicker(convertTickerInterval(s.conf), 
s.tickHandler)
+       s.task, err = task.GenerateTasker(s.conf.Task.Kind, 
convertTaskOptions(s.conf)...)
+       if err != nil {
+               log.Errorf(err, "Create tasker failed, %s", err)
+               return
+       }
 
        s.servicecenter, err = 
servicecenter.NewServicecenter(convertSCConfigOption(s.conf)...)
        if err != nil {
diff --git a/syncer/task/idle/idle.go b/syncer/task/idle/idle.go
new file mode 100644
index 0000000..dbf7fa3
--- /dev/null
+++ b/syncer/task/idle/idle.go
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package idle
+
+import (
+       "context"
+
+       "github.com/apache/servicecomb-service-center/syncer/task"
+)
+
+const (
+       // TaskName task name
+       TaskName = "idle"
+)
+
+func init() {
+       task.RegisterTasker(TaskName, NewIdle)
+}
+
+// Idle struct
+type Idle struct{}
+
+// NewIdle returns an idle task
+func NewIdle(params map[string]string) (task.Tasker, error) {
+       return &Idle{}, nil
+}
+
+// Run task
+func (t *Idle) Run(ctx context.Context) {}
+
+// Handle task trigger
+func (t *Idle) Handle(handler func()) {}
diff --git a/syncer/task/idle/idle_test.go b/syncer/task/idle/idle_test.go
new file mode 100644
index 0000000..01fdbd6
--- /dev/null
+++ b/syncer/task/idle/idle_test.go
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package idle
+
+import (
+       "context"
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+)
+
+func TestIdle(t *testing.T) {
+       task, err := NewIdle(map[string]string{})
+       assert.Nil(t, err)
+
+       task.Handle(func() {})
+
+       task.Run(context.Background())
+
+}
diff --git a/syncer/task/task.go b/syncer/task/task.go
new file mode 100644
index 0000000..e2f9bd2
--- /dev/null
+++ b/syncer/task/task.go
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package task
+
+import (
+       "context"
+       "errors"
+
+       "github.com/apache/servicecomb-service-center/pkg/log"
+)
+
+var taskMgr = map[string]generator{}
+
+type generator func(params map[string]string) (Tasker, error)
+
+// Tasker interface
+type Tasker interface {
+       Run(ctx context.Context)
+       Handle(handler func())
+}
+
+// RegisterTasker register an tasker to manager
+func RegisterTasker(name string, fn generator) {
+       if _, ok := taskMgr[name]; ok {
+               log.Warnf("task generator is already exist, name = %s", name)
+       }
+       taskMgr[name] = fn
+}
+
+// GenerateTasker generate an tasker by name from manager
+func GenerateTasker(name string, ops ...Option) (Tasker, error) {
+       fn, ok := taskMgr[name]
+       if !ok {
+               err := errors.New("trigger generator is not found")
+               log.Errorf(err, "name = %s", name)
+               return nil, err
+       }
+       return fn(toMap(ops...))
+}
+
+// Option task option
+type Option func(map[string]string)
+
+// WithAddKV wrap the key and value to an option
+func WithAddKV(key, value string) Option {
+       return func(m map[string]string) { m[key] = value }
+}
+
+func toMap(ops ...Option) map[string]string {
+       m := make(map[string]string, len(ops))
+       for _, op := range ops {
+               op(m)
+       }
+       return m
+}
diff --git a/syncer/task/task_test.go b/syncer/task/task_test.go
new file mode 100644
index 0000000..7c63dd9
--- /dev/null
+++ b/syncer/task/task_test.go
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package task
+
+import (
+       "context"
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+)
+
+// empty struct
+type empty struct{}
+
+// newEmpty returns an empty task
+func newEmpty(params map[string]string) (Tasker, error) {
+       return &empty{}, nil
+}
+
+// Run task
+func (t *empty) Run(ctx context.Context) {}
+
+// Handle task trigger
+func (t *empty) Handle(handler func()) {}
+
+func TestTasker(t *testing.T) {
+       taskName := "empty"
+       _, err := GenerateTasker(taskName)
+       assert.NotNil(t, err)
+
+       RegisterTasker(taskName, newEmpty)
+       RegisterTasker(taskName, newEmpty)
+
+       _, err = GenerateTasker(taskName, WithAddKV("test", "test"))
+       assert.Nil(t, err)
+}
diff --git a/syncer/task/ticker/ticker.go b/syncer/task/ticker/ticker.go
new file mode 100644
index 0000000..34b6ac9
--- /dev/null
+++ b/syncer/task/ticker/ticker.go
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ticker
+
+import (
+       "context"
+       "time"
+
+       "github.com/apache/servicecomb-service-center/pkg/log"
+       "github.com/apache/servicecomb-service-center/syncer/pkg/utils"
+       "github.com/apache/servicecomb-service-center/syncer/task"
+       "github.com/pkg/errors"
+)
+
+const (
+       TaskName    = "ticker"
+       intervalKey = "interval"
+)
+
+func init() {
+       task.RegisterTasker(TaskName, NewTicker)
+}
+
+// Ticker struct
+type Ticker struct {
+       interval time.Duration
+       handler  func()
+       running  *utils.AtomicBool
+       ticker   *time.Ticker
+}
+
+// NewTicker returns a ticker as a tasker
+func NewTicker(params map[string]string) (task.Tasker, error) {
+       val, ok := params[intervalKey]
+       if !ok {
+               return nil, errors.New("ticker: param interval notfound")
+       }
+
+       interval, err := time.ParseDuration(val)
+       if err != nil {
+               return nil, errors.Wrap(err, "ticker: parse interval duration 
failed")
+       }
+
+       return &Ticker{
+               interval: interval,
+               running:  utils.NewAtomicBool(false),
+       }, nil
+}
+
+// Run ticker task
+func (t *Ticker) Run(ctx context.Context) {
+       t.running.DoToReverse(false, func() {
+               t.ticker = time.NewTicker(t.interval)
+               t.handler()
+               go t.wait(ctx)
+       })
+}
+
+// Handle ticker task
+func (t *Ticker) Handle(handler func()) {
+       t.handler = handler
+}
+
+func (t *Ticker) wait(ctx context.Context) {
+       for {
+               select {
+               case <-t.ticker.C:
+                       t.handler()
+               case <-ctx.Done():
+                       t.stop()
+                       return
+               }
+       }
+}
+
+// stop ticker task
+func (t *Ticker) stop() {
+       t.running.DoToReverse(true, func() {
+               if t.ticker != nil {
+                       t.ticker.Stop()
+               }
+               log.Info("ticker stop")
+       })
+
+       log.Info("ticker done")
+}
diff --git a/syncer/task/ticker/ticker_test.go 
b/syncer/task/ticker/ticker_test.go
new file mode 100644
index 0000000..f2847de
--- /dev/null
+++ b/syncer/task/ticker/ticker_test.go
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ticker
+
+import (
+       "context"
+       "testing"
+       "time"
+
+       "github.com/stretchr/testify/assert"
+)
+
+func TestTicker(t *testing.T) {
+       params := map[string]string{}
+       _, err := NewTicker(params)
+       assert.NotNil(t, err)
+
+       params[intervalKey] = "1ams"
+       _, err = NewTicker(params)
+       assert.NotNil(t, err)
+
+       params[intervalKey] = "1s"
+
+       task, err := NewTicker(params)
+       assert.Nil(t, err)
+
+       isRunning := false
+       stopped := make(chan struct{})
+       task.Handle(func() {
+               if isRunning {
+                       close(stopped)
+               } else {
+                       isRunning = true
+               }
+       })
+
+       ctx, cancel := context.WithCancel(context.Background())
+       go task.Run(ctx)
+
+       <-stopped
+       assert.True(t, isRunning)
+       ctx.Done()
+       cancel()
+       time.Sleep(time.Second)
+}

Reply via email to