This is an automated email from the ASF dual-hosted git repository.
klesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/main by this push:
new 50204db1a feat: preventing database sharing (#3881)
50204db1a is described below
commit 50204db1abc3e96402883e1d425906ce25d37425
Author: Klesh Wong <[email protected]>
AuthorDate: Thu Dec 8 16:38:35 2022 +0800
feat: preventing database sharing (#3881)
* feat: preventing database sharing
* feat: support pg database locking
* refactor: move db-locking to service module
---
impl/dalgorm/dalgorm.go | 33 ++++++++++++++-
impl/dalgorm/dalgorm_transaction.go | 55 ++++++++++++++++++++++++
models/locking.go | 51 +++++++++++++++++++++++
plugins/core/dal/dal.go | 23 ++++++++++
runner/db.go | 18 ++++++--
services/init.go | 23 ++++++++--
services/locking.go | 83 +++++++++++++++++++++++++++++++++++++
7 files changed, 277 insertions(+), 9 deletions(-)
diff --git a/impl/dalgorm/dalgorm.go b/impl/dalgorm/dalgorm.go
index c062b875a..8b0887bca 100644
--- a/impl/dalgorm/dalgorm.go
+++ b/impl/dalgorm/dalgorm.go
@@ -31,11 +31,13 @@ import (
"gorm.io/gorm/clause"
)
-// Dalgorm FIXME ...
+// Dalgorm implements the dal.Dal interface with gorm
type Dalgorm struct {
db *gorm.DB
}
+var _ dal.Dal = (*Dalgorm)(nil)
+
func transformParams(params []interface{}) []interface{} {
tp := make([]interface{}, 0, len(params))
@@ -98,6 +100,19 @@ func buildTx(tx *gorm.DB, clauses []dal.Clause) *gorm.DB {
tx = tx.Group(d.(string))
case dal.HavingClause:
tx = tx.Having(d.(dal.DalClause).Expr,
transformParams(d.(dal.DalClause).Params)...)
+ case dal.LockClause:
+ locking := clause.Locking{}
+ params := d.([]bool)
+ write := params[0]
+ if write {
+ locking.Strength = "UPDATE"
+ }
+ nowait := params[1]
+ if nowait {
+ locking.Options = "NOWAIT"
+ }
+
+ tx = tx.Clauses(locking)
}
}
return tx
@@ -326,7 +341,21 @@ func (d *Dalgorm) Dialect() string {
return d.db.Dialector.Name()
}
-// NewDalgorm FIXME ...
+// Session creates a new manual transaction for special scenarios
+func (d *Dalgorm) Session(config dal.SessionConfig) dal.Dal {
+ session := d.db.Session(&gorm.Session{
+ PrepareStmt: config.PrepareStmt,
+ SkipDefaultTransaction: config.SkipDefaultTransaction,
+ })
+ return NewDalgorm(session)
+}
+
+// Begin create a new transaction
+func (d *Dalgorm) Begin() dal.Transaction {
+ return newTransaction(d)
+}
+
+// NewDalgorm creates a *Dalgorm
func NewDalgorm(db *gorm.DB) *Dalgorm {
return &Dalgorm{db}
}
diff --git a/impl/dalgorm/dalgorm_transaction.go
b/impl/dalgorm/dalgorm_transaction.go
new file mode 100644
index 000000000..d80e71630
--- /dev/null
+++ b/impl/dalgorm/dalgorm_transaction.go
@@ -0,0 +1,55 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package dalgorm
+
+import (
+ "github.com/apache/incubator-devlake/errors"
+ "github.com/apache/incubator-devlake/plugins/core/dal"
+)
+
+// DalgormTransaction represents a gorm transaction which using the same
underlying
+// session for all queries
+type DalgormTransaction struct {
+ *Dalgorm
+}
+
+var _ dal.Transaction = (*DalgormTransaction)(nil)
+
+// Rollback the transaction
+func (t *DalgormTransaction) Rollback() errors.Error {
+ r := t.db.Rollback()
+ if r.Error != nil {
+ return errors.Default.Wrap(r.Error, "failed to rollback
transaction")
+ }
+ return nil
+}
+
+// Commit the transaction
+func (t *DalgormTransaction) Commit() errors.Error {
+ r := t.db.Commit()
+ if r.Error != nil {
+ return errors.Default.Wrap(r.Error, "failed to commit
transaction")
+ }
+ return nil
+}
+
+func newTransaction(dalgorm *Dalgorm) *DalgormTransaction {
+ return &DalgormTransaction{
+ Dalgorm: NewDalgorm(dalgorm.db.Begin()),
+ }
+}
diff --git a/models/locking.go b/models/locking.go
new file mode 100644
index 000000000..3228eced3
--- /dev/null
+++ b/models/locking.go
@@ -0,0 +1,51 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package models
+
+import "time"
+
+// LockingHistory is desgned for preventing mutiple delake instances from
sharing the same database which may cause
+// problems like #3537, #3466. It works by the following step:
+//
+// 1. Each devlake insert a record to this table whie `Succeeded=false`
+// 2. Then it should try to lock the LockingStub table
+// 3. Update the record with `Succeeded=true` if it had obtained the lock
successfully
+//
+// NOTE: it works IFF all devlake instances obey the principle described
above, in other words, this mechanism can
+// not prevent older versions from sharing the same database
+type LockingHistory struct {
+ ID uint64 `gorm:"primaryKey" json:"id"`
+ HostName string
+ Version string
+ Succeeded bool
+ CreatedAt time.Time `json:"createdAt"`
+ UpdatedAt time.Time `json:"updatedAt"`
+}
+
+func (LockingHistory) TableName() string {
+ return "_devlake_locking_history"
+}
+
+// LockingStub does nothing but offer a locking target
+type LockingStub struct {
+ Stub string
+}
+
+func (LockingStub) TableName() string {
+ return "_devlake_locking_stub"
+}
diff --git a/plugins/core/dal/dal.go b/plugins/core/dal/dal.go
index 9c36f3975..d5fa37e9c 100644
--- a/plugins/core/dal/dal.go
+++ b/plugins/core/dal/dal.go
@@ -76,6 +76,12 @@ type ColumnMeta interface {
DefaultValue() (value string, ok bool)
}
+// SessionConfig specify options for the session
+type SessionConfig struct {
+ PrepareStmt bool
+ SkipDefaultTransaction bool
+}
+
// Dal aims to facilitate an isolation between DBS and our System by defining
a set of operations should a DBS provide
type Dal interface {
// AutoMigrate runs auto migration for given entity
@@ -130,6 +136,16 @@ type Dal interface {
DropIndexes(table string, indexes ...string) errors.Error
// Dialect returns the dialect of current database
Dialect() string
+ // Session creates a new manual session for special scenarios
+ Session(config SessionConfig) Dal
+ // Begin create a new transaction
+ Begin() Transaction
+}
+
+type Transaction interface {
+ Dal
+ Rollback() errors.Error
+ Commit() errors.Error
}
type Rows interface {
@@ -268,3 +284,10 @@ const HavingClause string = "Having"
func Having(clause string, params ...interface{}) Clause {
return Clause{Type: HavingClause, Data: DalClause{clause, params}}
}
+
+const LockClause string = "Lock"
+
+// Having creates a new Having clause
+func Lock(write bool, nowait bool) Clause {
+ return Clause{Type: LockClause, Data: []bool{write, nowait}}
+}
diff --git a/runner/db.go b/runner/db.go
index 43d35cd27..3357b104d 100644
--- a/runner/db.go
+++ b/runner/db.go
@@ -19,12 +19,14 @@ package runner
import (
"fmt"
- "github.com/apache/incubator-devlake/errors"
"net/url"
"strings"
"time"
+ "github.com/apache/incubator-devlake/errors"
+
"github.com/apache/incubator-devlake/plugins/core"
+ "github.com/apache/incubator-devlake/plugins/core/dal"
"github.com/spf13/viper"
"gorm.io/driver/mysql"
"gorm.io/driver/postgres"
@@ -32,8 +34,16 @@ import (
gormLogger "gorm.io/gorm/logger"
)
-// NewGormDb FIXME ...
+// NewGormDb creates a new *gorm.DB and set it up properly
func NewGormDb(config *viper.Viper, logger core.Logger) (*gorm.DB,
errors.Error) {
+ return NewGormDbEx(config, logger, &dal.SessionConfig{
+ PrepareStmt: true,
+ SkipDefaultTransaction: true,
+ })
+}
+
+// NewGormDbEx acts like NewGormDb but accept extra sessionConfig
+func NewGormDbEx(config *viper.Viper, logger core.Logger, sessionConfig
*dal.SessionConfig) (*gorm.DB, errors.Error) {
dbLoggingLevel := gormLogger.Error
switch strings.ToLower(config.GetString("DB_LOGGING_LEVEL")) {
case "silent":
@@ -63,8 +73,8 @@ func NewGormDb(config *viper.Viper, logger core.Logger)
(*gorm.DB, errors.Error)
Colorful: true, //
Disable color
},
),
- // most of our operation are in batch, this can improve
performance
- PrepareStmt: true,
+ PrepareStmt: sessionConfig.PrepareStmt,
+ SkipDefaultTransaction: sessionConfig.SkipDefaultTransaction,
}
dbUrl := config.GetString("DB_URL")
if dbUrl == "" {
diff --git a/services/init.go b/services/init.go
index 25ac9ab45..27e8f91c5 100644
--- a/services/init.go
+++ b/services/init.go
@@ -20,15 +20,17 @@ package services
import (
"time"
+ "sync"
+
"github.com/apache/incubator-devlake/errors"
"github.com/apache/incubator-devlake/impl"
- "sync"
"github.com/apache/incubator-devlake/config"
"github.com/apache/incubator-devlake/impl/dalgorm"
"github.com/apache/incubator-devlake/logger"
"github.com/apache/incubator-devlake/models/migrationscripts"
"github.com/apache/incubator-devlake/plugins/core"
+ "github.com/apache/incubator-devlake/plugins/core/dal"
"github.com/apache/incubator-devlake/runner"
"github.com/robfig/cron/v3"
"github.com/spf13/viper"
@@ -55,15 +57,30 @@ func Init() {
if err != nil {
panic(err)
}
- basicRes = impl.NewDefaultBasicRes(cfg, log, dalgorm.NewDalgorm(db))
+ // gorm doesn't support creating a PrepareStmt=false session from a
PrepareStmt=true
+ // but the lockDatabase needs PrepareStmt=false for table locking, we
have to deal with it here
+ lockingDb, err := runner.NewGormDbEx(cfg,
logger.Global.Nested("migrator db"), &dal.SessionConfig{
+ PrepareStmt: false,
+ SkipDefaultTransaction: true,
+ })
+ if err != nil {
+ panic(err)
+ }
+ err = lockDatabase(dalgorm.NewDalgorm(lockingDb))
+ if err != nil {
+ panic(err)
+ }
- // initialize db migrator singletone
+ basicRes = impl.NewDefaultBasicRes(cfg, log, dalgorm.NewDalgorm(db))
+ // initialize db migrator
migrator, err = runner.InitMigrator(basicRes)
if err != nil {
panic(err)
}
+ log.Info("migration initialized")
migrator.Register(migrationscripts.All(), "Framework")
+ // now,
// load plugins
err = runner.LoadPlugins(
cfg.GetString("PLUGIN_DIR"),
diff --git a/services/locking.go b/services/locking.go
new file mode 100644
index 000000000..f97d76a91
--- /dev/null
+++ b/services/locking.go
@@ -0,0 +1,83 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package services
+
+import (
+ "os"
+ "time"
+
+ "github.com/apache/incubator-devlake/errors"
+ "github.com/apache/incubator-devlake/models"
+ "github.com/apache/incubator-devlake/plugins/core/dal"
+ "github.com/apache/incubator-devlake/version"
+)
+
+// long last transaction for database locking
+var lockingTx dal.Transaction
+
+// lockDatabase prevents multiple devlake instances from sharing the same
lockDatabase
+// check the models.LockingHistory for the detail
+func lockDatabase(db dal.Dal) errors.Error {
+ // first, register the instance
+ err := db.AutoMigrate(&models.LockingHistory{})
+ if err != nil {
+ return err
+ }
+ hostName, e := os.Hostname()
+ if e != nil {
+ return errors.Convert(e)
+ }
+ lockingHistory := &models.LockingHistory{
+ HostName: hostName,
+ Version: version.Version,
+ }
+ err = db.Create(lockingHistory)
+ if err != nil {
+ return err
+ }
+ // 2. obtain the lock
+ err = db.AutoMigrate(&models.LockingStub{})
+ if err != nil {
+ return err
+ }
+ lockingTx = db.Begin()
+ c := make(chan error, 1)
+
+ // This prevent multiple devlake instances from sharing the same
database by locking the migration history table
+ // However, it would not work if any older devlake instances were
already using the database.
+ go func() {
+ switch db.Dialect() {
+ case "mysql":
+ c <- lockingTx.Exec("LOCK TABLE _devlake_locking_stub
WRITE")
+ case "postgres":
+ c <- lockingTx.Exec("LOCK TABLE _devlake_locking_stub
IN EXCLUSIVE MODE")
+ }
+ }()
+
+ select {
+ case err := <-c:
+ if err != nil {
+ return errors.Convert(err)
+ }
+ case <-time.After(2 * time.Second):
+ return errors.Default.New("locking _devlake_locking_stub
timeout, the database might be locked by another devlake instance")
+ }
+ // 3. update the record
+ lockingHistory.Succeeded = true
+ return db.Update(lockingHistory)
+}