This is an automated email from the ASF dual-hosted git repository.

klesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new 50204db1a feat: preventing database sharing (#3881)
50204db1a is described below

commit 50204db1abc3e96402883e1d425906ce25d37425
Author: Klesh Wong <[email protected]>
AuthorDate: Thu Dec 8 16:38:35 2022 +0800

    feat: preventing database sharing (#3881)
    
    * feat: preventing database sharing
    
    * feat: support pg database locking
    
    * refactor: move db-locking to service module
---
 impl/dalgorm/dalgorm.go             | 33 ++++++++++++++-
 impl/dalgorm/dalgorm_transaction.go | 55 ++++++++++++++++++++++++
 models/locking.go                   | 51 +++++++++++++++++++++++
 plugins/core/dal/dal.go             | 23 ++++++++++
 runner/db.go                        | 18 ++++++--
 services/init.go                    | 23 ++++++++--
 services/locking.go                 | 83 +++++++++++++++++++++++++++++++++++++
 7 files changed, 277 insertions(+), 9 deletions(-)

diff --git a/impl/dalgorm/dalgorm.go b/impl/dalgorm/dalgorm.go
index c062b875a..8b0887bca 100644
--- a/impl/dalgorm/dalgorm.go
+++ b/impl/dalgorm/dalgorm.go
@@ -31,11 +31,13 @@ import (
        "gorm.io/gorm/clause"
 )
 
-// Dalgorm FIXME ...
+// Dalgorm implements the dal.Dal interface with gorm
 type Dalgorm struct {
        db *gorm.DB
 }
 
+var _ dal.Dal = (*Dalgorm)(nil)
+
 func transformParams(params []interface{}) []interface{} {
        tp := make([]interface{}, 0, len(params))
 
@@ -98,6 +100,19 @@ func buildTx(tx *gorm.DB, clauses []dal.Clause) *gorm.DB {
                        tx = tx.Group(d.(string))
                case dal.HavingClause:
                        tx = tx.Having(d.(dal.DalClause).Expr, 
transformParams(d.(dal.DalClause).Params)...)
+               case dal.LockClause:
+                       locking := clause.Locking{}
+                       params := d.([]bool)
+                       write := params[0]
+                       if write {
+                               locking.Strength = "UPDATE"
+                       }
+                       nowait := params[1]
+                       if nowait {
+                               locking.Options = "NOWAIT"
+                       }
+
+                       tx = tx.Clauses(locking)
                }
        }
        return tx
@@ -326,7 +341,21 @@ func (d *Dalgorm) Dialect() string {
        return d.db.Dialector.Name()
 }
 
-// NewDalgorm FIXME ...
+// Session creates a new manual transaction for special scenarios
+func (d *Dalgorm) Session(config dal.SessionConfig) dal.Dal {
+       session := d.db.Session(&gorm.Session{
+               PrepareStmt:            config.PrepareStmt,
+               SkipDefaultTransaction: config.SkipDefaultTransaction,
+       })
+       return NewDalgorm(session)
+}
+
+// Begin create a new transaction
+func (d *Dalgorm) Begin() dal.Transaction {
+       return newTransaction(d)
+}
+
+// NewDalgorm creates a *Dalgorm
 func NewDalgorm(db *gorm.DB) *Dalgorm {
        return &Dalgorm{db}
 }
diff --git a/impl/dalgorm/dalgorm_transaction.go 
b/impl/dalgorm/dalgorm_transaction.go
new file mode 100644
index 000000000..d80e71630
--- /dev/null
+++ b/impl/dalgorm/dalgorm_transaction.go
@@ -0,0 +1,55 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package dalgorm
+
+import (
+       "github.com/apache/incubator-devlake/errors"
+       "github.com/apache/incubator-devlake/plugins/core/dal"
+)
+
+// DalgormTransaction represents a gorm transaction which using the same 
underlying
+// session for all queries
+type DalgormTransaction struct {
+       *Dalgorm
+}
+
+var _ dal.Transaction = (*DalgormTransaction)(nil)
+
+// Rollback the transaction
+func (t *DalgormTransaction) Rollback() errors.Error {
+       r := t.db.Rollback()
+       if r.Error != nil {
+               return errors.Default.Wrap(r.Error, "failed to rollback 
transaction")
+       }
+       return nil
+}
+
+// Commit the transaction
+func (t *DalgormTransaction) Commit() errors.Error {
+       r := t.db.Commit()
+       if r.Error != nil {
+               return errors.Default.Wrap(r.Error, "failed to commit 
transaction")
+       }
+       return nil
+}
+
+func newTransaction(dalgorm *Dalgorm) *DalgormTransaction {
+       return &DalgormTransaction{
+               Dalgorm: NewDalgorm(dalgorm.db.Begin()),
+       }
+}
diff --git a/models/locking.go b/models/locking.go
new file mode 100644
index 000000000..3228eced3
--- /dev/null
+++ b/models/locking.go
@@ -0,0 +1,51 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package models
+
+import "time"
+
+// LockingHistory is desgned for preventing mutiple delake instances from 
sharing the same database which may cause
+// problems like #3537, #3466. It works by the following step:
+//
+// 1. Each devlake insert a record to this table whie `Succeeded=false`
+// 2. Then it should try to lock the LockingStub table
+// 3. Update the record with `Succeeded=true` if it had obtained the lock 
successfully
+//
+// NOTE: it works IFF all devlake instances obey the principle described 
above, in other words, this mechanism can
+// not prevent older versions from sharing the same database
+type LockingHistory struct {
+       ID        uint64 `gorm:"primaryKey" json:"id"`
+       HostName  string
+       Version   string
+       Succeeded bool
+       CreatedAt time.Time `json:"createdAt"`
+       UpdatedAt time.Time `json:"updatedAt"`
+}
+
+func (LockingHistory) TableName() string {
+       return "_devlake_locking_history"
+}
+
+// LockingStub does nothing but offer a locking target
+type LockingStub struct {
+       Stub string
+}
+
+func (LockingStub) TableName() string {
+       return "_devlake_locking_stub"
+}
diff --git a/plugins/core/dal/dal.go b/plugins/core/dal/dal.go
index 9c36f3975..d5fa37e9c 100644
--- a/plugins/core/dal/dal.go
+++ b/plugins/core/dal/dal.go
@@ -76,6 +76,12 @@ type ColumnMeta interface {
        DefaultValue() (value string, ok bool)
 }
 
+// SessionConfig specify options for the session
+type SessionConfig struct {
+       PrepareStmt            bool
+       SkipDefaultTransaction bool
+}
+
 // Dal aims to facilitate an isolation between DBS and our System by defining 
a set of operations should a DBS provide
 type Dal interface {
        // AutoMigrate runs auto migration for given entity
@@ -130,6 +136,16 @@ type Dal interface {
        DropIndexes(table string, indexes ...string) errors.Error
        // Dialect returns the dialect of current database
        Dialect() string
+       // Session creates a new manual session for special scenarios
+       Session(config SessionConfig) Dal
+       // Begin create a new transaction
+       Begin() Transaction
+}
+
+type Transaction interface {
+       Dal
+       Rollback() errors.Error
+       Commit() errors.Error
 }
 
 type Rows interface {
@@ -268,3 +284,10 @@ const HavingClause string = "Having"
 func Having(clause string, params ...interface{}) Clause {
        return Clause{Type: HavingClause, Data: DalClause{clause, params}}
 }
+
+const LockClause string = "Lock"
+
+// Having creates a new Having clause
+func Lock(write bool, nowait bool) Clause {
+       return Clause{Type: LockClause, Data: []bool{write, nowait}}
+}
diff --git a/runner/db.go b/runner/db.go
index 43d35cd27..3357b104d 100644
--- a/runner/db.go
+++ b/runner/db.go
@@ -19,12 +19,14 @@ package runner
 
 import (
        "fmt"
-       "github.com/apache/incubator-devlake/errors"
        "net/url"
        "strings"
        "time"
 
+       "github.com/apache/incubator-devlake/errors"
+
        "github.com/apache/incubator-devlake/plugins/core"
+       "github.com/apache/incubator-devlake/plugins/core/dal"
        "github.com/spf13/viper"
        "gorm.io/driver/mysql"
        "gorm.io/driver/postgres"
@@ -32,8 +34,16 @@ import (
        gormLogger "gorm.io/gorm/logger"
 )
 
-// NewGormDb FIXME ...
+// NewGormDb creates a new *gorm.DB and set it up properly
 func NewGormDb(config *viper.Viper, logger core.Logger) (*gorm.DB, 
errors.Error) {
+       return NewGormDbEx(config, logger, &dal.SessionConfig{
+               PrepareStmt:            true,
+               SkipDefaultTransaction: true,
+       })
+}
+
+// NewGormDbEx acts like NewGormDb but accept extra sessionConfig
+func NewGormDbEx(config *viper.Viper, logger core.Logger, sessionConfig 
*dal.SessionConfig) (*gorm.DB, errors.Error) {
        dbLoggingLevel := gormLogger.Error
        switch strings.ToLower(config.GetString("DB_LOGGING_LEVEL")) {
        case "silent":
@@ -63,8 +73,8 @@ func NewGormDb(config *viper.Viper, logger core.Logger) 
(*gorm.DB, errors.Error)
                                Colorful:                  true,           // 
Disable color
                        },
                ),
-               // most of our operation are in batch, this can improve 
performance
-               PrepareStmt: true,
+               PrepareStmt:            sessionConfig.PrepareStmt,
+               SkipDefaultTransaction: sessionConfig.SkipDefaultTransaction,
        }
        dbUrl := config.GetString("DB_URL")
        if dbUrl == "" {
diff --git a/services/init.go b/services/init.go
index 25ac9ab45..27e8f91c5 100644
--- a/services/init.go
+++ b/services/init.go
@@ -20,15 +20,17 @@ package services
 import (
        "time"
 
+       "sync"
+
        "github.com/apache/incubator-devlake/errors"
        "github.com/apache/incubator-devlake/impl"
-       "sync"
 
        "github.com/apache/incubator-devlake/config"
        "github.com/apache/incubator-devlake/impl/dalgorm"
        "github.com/apache/incubator-devlake/logger"
        "github.com/apache/incubator-devlake/models/migrationscripts"
        "github.com/apache/incubator-devlake/plugins/core"
+       "github.com/apache/incubator-devlake/plugins/core/dal"
        "github.com/apache/incubator-devlake/runner"
        "github.com/robfig/cron/v3"
        "github.com/spf13/viper"
@@ -55,15 +57,30 @@ func Init() {
        if err != nil {
                panic(err)
        }
-       basicRes = impl.NewDefaultBasicRes(cfg, log, dalgorm.NewDalgorm(db))
+       // gorm doesn't support creating a PrepareStmt=false session from a 
PrepareStmt=true
+       // but the lockDatabase needs PrepareStmt=false for table locking, we 
have to deal with it here
+       lockingDb, err := runner.NewGormDbEx(cfg, 
logger.Global.Nested("migrator db"), &dal.SessionConfig{
+               PrepareStmt:            false,
+               SkipDefaultTransaction: true,
+       })
+       if err != nil {
+               panic(err)
+       }
+       err = lockDatabase(dalgorm.NewDalgorm(lockingDb))
+       if err != nil {
+               panic(err)
+       }
 
-       // initialize db migrator singletone
+       basicRes = impl.NewDefaultBasicRes(cfg, log, dalgorm.NewDalgorm(db))
+       // initialize db migrator
        migrator, err = runner.InitMigrator(basicRes)
        if err != nil {
                panic(err)
        }
+       log.Info("migration initialized")
        migrator.Register(migrationscripts.All(), "Framework")
 
+       // now,
        // load plugins
        err = runner.LoadPlugins(
                cfg.GetString("PLUGIN_DIR"),
diff --git a/services/locking.go b/services/locking.go
new file mode 100644
index 000000000..f97d76a91
--- /dev/null
+++ b/services/locking.go
@@ -0,0 +1,83 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package services
+
+import (
+       "os"
+       "time"
+
+       "github.com/apache/incubator-devlake/errors"
+       "github.com/apache/incubator-devlake/models"
+       "github.com/apache/incubator-devlake/plugins/core/dal"
+       "github.com/apache/incubator-devlake/version"
+)
+
+// long last transaction for database locking
+var lockingTx dal.Transaction
+
+// lockDatabase prevents multiple devlake instances from sharing the same 
lockDatabase
+// check the models.LockingHistory for the detail
+func lockDatabase(db dal.Dal) errors.Error {
+       // first, register the instance
+       err := db.AutoMigrate(&models.LockingHistory{})
+       if err != nil {
+               return err
+       }
+       hostName, e := os.Hostname()
+       if e != nil {
+               return errors.Convert(e)
+       }
+       lockingHistory := &models.LockingHistory{
+               HostName: hostName,
+               Version:  version.Version,
+       }
+       err = db.Create(lockingHistory)
+       if err != nil {
+               return err
+       }
+       // 2. obtain the lock
+       err = db.AutoMigrate(&models.LockingStub{})
+       if err != nil {
+               return err
+       }
+       lockingTx = db.Begin()
+       c := make(chan error, 1)
+
+       // This prevent multiple devlake instances from sharing the same 
database by locking the migration history table
+       // However, it would not work if any older devlake instances were 
already using the database.
+       go func() {
+               switch db.Dialect() {
+               case "mysql":
+                       c <- lockingTx.Exec("LOCK TABLE _devlake_locking_stub 
WRITE")
+               case "postgres":
+                       c <- lockingTx.Exec("LOCK TABLE _devlake_locking_stub 
IN EXCLUSIVE MODE")
+               }
+       }()
+
+       select {
+       case err := <-c:
+               if err != nil {
+                       return errors.Convert(err)
+               }
+       case <-time.After(2 * time.Second):
+               return errors.Default.New("locking _devlake_locking_stub 
timeout, the database might be locked by another devlake instance")
+       }
+       // 3. update the record
+       lockingHistory.Succeeded = true
+       return db.Update(lockingHistory)
+}

Reply via email to