This is an automated email from the ASF dual-hosted git repository.

warren pushed a commit to branch release-v0.12
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/release-v0.12 by this push:
     new 11c993e4 feat: starrocks plugin support pg array
11c993e4 is described below

commit 11c993e4be7df71c400a521926058f63cc90b661
Author: long2ice <[email protected]>
AuthorDate: Mon Sep 19 19:49:10 2022 +0800

    feat: starrocks plugin support pg array
---
 plugins/starrocks/tasks.go | 30 +++++++++++++++++++++---------
 plugins/starrocks/utils.go |  7 ++++++-
 2 files changed, 27 insertions(+), 10 deletions(-)

diff --git a/plugins/starrocks/tasks.go b/plugins/starrocks/tasks.go
index ece4131c..f2aa7c63 100644
--- a/plugins/starrocks/tasks.go
+++ b/plugins/starrocks/tasks.go
@@ -24,6 +24,7 @@ import (
        "github.com/apache/incubator-devlake/impl/dalgorm"
        "github.com/apache/incubator-devlake/plugins/core"
        "github.com/apache/incubator-devlake/plugins/core/dal"
+       "github.com/lib/pq"
        "gorm.io/driver/mysql"
        "gorm.io/driver/postgres"
        "gorm.io/gorm"
@@ -102,12 +103,13 @@ func LoadData(c core.SubTaskContext) error {
 
        for _, table := range starrocksTables {
                starrocksTable := strings.TrimLeft(table, "_")
-               err = createTable(starrocks, db, starrocksTable, table, c, 
config.Extra)
+               var columnMap map[string]string
+               columnMap, err = createTable(starrocks, db, starrocksTable, 
table, c, config.Extra)
                if err != nil {
                        c.GetLogger().Error("create table %s in starrocks 
error: %s", table, err)
                        return err
                }
-               err = loadData(starrocks, c, starrocksTable, table, db, config)
+               err = loadData(starrocks, c, starrocksTable, table, columnMap, 
db, config)
                if err != nil {
                        c.GetLogger().Error("load data %s error: %s", table, 
err)
                        return err
@@ -115,10 +117,11 @@ func LoadData(c core.SubTaskContext) error {
        }
        return nil
 }
-func createTable(starrocks *sql.DB, db dal.Dal, starrocksTable string, table 
string, c core.SubTaskContext, extra string) error {
+func createTable(starrocks *sql.DB, db dal.Dal, starrocksTable string, table 
string, c core.SubTaskContext, extra string) (map[string]string, error) {
        columeMetas, err := db.GetColumns(&Table{name: table}, nil)
+       columnMap := make(map[string]string)
        if err != nil {
-               return err
+               return columnMap, err
        }
        var pks []string
        var columns []string
@@ -127,9 +130,11 @@ func createTable(starrocks *sql.DB, db dal.Dal, 
starrocksTable string, table str
                name := cm.Name()
                starrocksDatatype, ok := cm.ColumnType()
                if !ok {
-                       return fmt.Errorf("Get [%s] ColumeType Failed", name)
+                       return columnMap, fmt.Errorf("Get [%s] ColumeType 
Failed", name)
                }
-               column := fmt.Sprintf("`%s` %s", name, 
getDataType(starrocksDatatype))
+               dataType := getDataType(starrocksDatatype)
+               columnMap[name] = dataType
+               column := fmt.Sprintf("`%s` %s", name, dataType)
                columns = append(columns, column)
                isPrimaryKey, ok := cm.PrimaryKey()
                if isPrimaryKey && ok {
@@ -150,10 +155,10 @@ func createTable(starrocks *sql.DB, db dal.Dal, 
starrocksTable string, table str
        tableSql := fmt.Sprintf("create table if not exists `%s` ( %s ) %s", 
starrocksTable, strings.Join(columns, ","), extra)
        c.GetLogger().Info(tableSql)
        _, err = starrocks.Exec(tableSql)
-       return err
+       return columnMap, err
 }
 
-func loadData(starrocks *sql.DB, c core.SubTaskContext, starrocksTable string, 
table string, db dal.Dal, config *StarRocksConfig) error {
+func loadData(starrocks *sql.DB, c core.SubTaskContext, starrocksTable string, 
table string, columnMap map[string]string, db dal.Dal, config *StarRocksConfig) 
error {
        offset := 0
        starrocksTmpTable := starrocksTable + "_tmp"
        // create tmp table in starrocks
@@ -177,7 +182,14 @@ func loadData(starrocks *sql.DB, c core.SubTaskContext, 
starrocksTable string, t
                        columns := make([]interface{}, len(cols))
                        columnPointers := make([]interface{}, len(cols))
                        for i := range columns {
-                               columnPointers[i] = &columns[i]
+                               dataType := columnMap[cols[i]]
+                               if strings.HasPrefix(dataType, "array") {
+                                       var arr []string
+                                       columns[i] = &arr
+                                       columnPointers[i] = pq.Array(&arr)
+                               } else {
+                                       columnPointers[i] = &columns[i]
+                               }
                        }
                        err = rows.Scan(columnPointers...)
                        if err != nil {
diff --git a/plugins/starrocks/utils.go b/plugins/starrocks/utils.go
index a68c79e5..60833277 100644
--- a/plugins/starrocks/utils.go
+++ b/plugins/starrocks/utils.go
@@ -16,7 +16,10 @@ limitations under the License.
 */
 package main
 
-import "strings"
+import (
+       "fmt"
+       "strings"
+)
 
 func getTablesByDomainLayer(domainLayer string) []string {
        switch domainLayer {
@@ -101,6 +104,8 @@ func getDataType(dataType string) string {
                starrocksDatatype = "json"
        } else if dataType == "uuid" {
                starrocksDatatype = "char(36)"
+       } else if strings.HasSuffix(dataType, "[]") {
+               starrocksDatatype = fmt.Sprintf("array<%s>", 
getDataType(strings.Split(dataType, "[]")[0]))
        }
        return starrocksDatatype
 }

Reply via email to