This is an automated email from the ASF dual-hosted git repository.
warren pushed a commit to branch release-v0.12
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/release-v0.12 by this push:
new 11c993e4 feat: starrocks plugin support pg array
11c993e4 is described below
commit 11c993e4be7df71c400a521926058f63cc90b661
Author: long2ice <[email protected]>
AuthorDate: Mon Sep 19 19:49:10 2022 +0800
feat: starrocks plugin support pg array
---
plugins/starrocks/tasks.go | 30 +++++++++++++++++++++---------
plugins/starrocks/utils.go | 7 ++++++-
2 files changed, 27 insertions(+), 10 deletions(-)
diff --git a/plugins/starrocks/tasks.go b/plugins/starrocks/tasks.go
index ece4131c..f2aa7c63 100644
--- a/plugins/starrocks/tasks.go
+++ b/plugins/starrocks/tasks.go
@@ -24,6 +24,7 @@ import (
"github.com/apache/incubator-devlake/impl/dalgorm"
"github.com/apache/incubator-devlake/plugins/core"
"github.com/apache/incubator-devlake/plugins/core/dal"
+ "github.com/lib/pq"
"gorm.io/driver/mysql"
"gorm.io/driver/postgres"
"gorm.io/gorm"
@@ -102,12 +103,13 @@ func LoadData(c core.SubTaskContext) error {
for _, table := range starrocksTables {
starrocksTable := strings.TrimLeft(table, "_")
- err = createTable(starrocks, db, starrocksTable, table, c,
config.Extra)
+ var columnMap map[string]string
+ columnMap, err = createTable(starrocks, db, starrocksTable,
table, c, config.Extra)
if err != nil {
c.GetLogger().Error("create table %s in starrocks
error: %s", table, err)
return err
}
- err = loadData(starrocks, c, starrocksTable, table, db, config)
+ err = loadData(starrocks, c, starrocksTable, table, columnMap,
db, config)
if err != nil {
c.GetLogger().Error("load data %s error: %s", table,
err)
return err
@@ -115,10 +117,11 @@ func LoadData(c core.SubTaskContext) error {
}
return nil
}
-func createTable(starrocks *sql.DB, db dal.Dal, starrocksTable string, table
string, c core.SubTaskContext, extra string) error {
+func createTable(starrocks *sql.DB, db dal.Dal, starrocksTable string, table
string, c core.SubTaskContext, extra string) (map[string]string, error) {
columeMetas, err := db.GetColumns(&Table{name: table}, nil)
+ columnMap := make(map[string]string)
if err != nil {
- return err
+ return columnMap, err
}
var pks []string
var columns []string
@@ -127,9 +130,11 @@ func createTable(starrocks *sql.DB, db dal.Dal,
starrocksTable string, table str
name := cm.Name()
starrocksDatatype, ok := cm.ColumnType()
if !ok {
- return fmt.Errorf("Get [%s] ColumeType Failed", name)
+ return columnMap, fmt.Errorf("Get [%s] ColumeType
Failed", name)
}
- column := fmt.Sprintf("`%s` %s", name,
getDataType(starrocksDatatype))
+ dataType := getDataType(starrocksDatatype)
+ columnMap[name] = dataType
+ column := fmt.Sprintf("`%s` %s", name, dataType)
columns = append(columns, column)
isPrimaryKey, ok := cm.PrimaryKey()
if isPrimaryKey && ok {
@@ -150,10 +155,10 @@ func createTable(starrocks *sql.DB, db dal.Dal,
starrocksTable string, table str
tableSql := fmt.Sprintf("create table if not exists `%s` ( %s ) %s",
starrocksTable, strings.Join(columns, ","), extra)
c.GetLogger().Info(tableSql)
_, err = starrocks.Exec(tableSql)
- return err
+ return columnMap, err
}
-func loadData(starrocks *sql.DB, c core.SubTaskContext, starrocksTable string,
table string, db dal.Dal, config *StarRocksConfig) error {
+func loadData(starrocks *sql.DB, c core.SubTaskContext, starrocksTable string,
table string, columnMap map[string]string, db dal.Dal, config *StarRocksConfig)
error {
offset := 0
starrocksTmpTable := starrocksTable + "_tmp"
// create tmp table in starrocks
@@ -177,7 +182,14 @@ func loadData(starrocks *sql.DB, c core.SubTaskContext,
starrocksTable string, t
columns := make([]interface{}, len(cols))
columnPointers := make([]interface{}, len(cols))
for i := range columns {
- columnPointers[i] = &columns[i]
+ dataType := columnMap[cols[i]]
+ if strings.HasPrefix(dataType, "array") {
+ var arr []string
+ columns[i] = &arr
+ columnPointers[i] = pq.Array(&arr)
+ } else {
+ columnPointers[i] = &columns[i]
+ }
}
err = rows.Scan(columnPointers...)
if err != nil {
diff --git a/plugins/starrocks/utils.go b/plugins/starrocks/utils.go
index a68c79e5..60833277 100644
--- a/plugins/starrocks/utils.go
+++ b/plugins/starrocks/utils.go
@@ -16,7 +16,10 @@ limitations under the License.
*/
package main
-import "strings"
+import (
+ "fmt"
+ "strings"
+)
func getTablesByDomainLayer(domainLayer string) []string {
switch domainLayer {
@@ -101,6 +104,8 @@ func getDataType(dataType string) string {
starrocksDatatype = "json"
} else if dataType == "uuid" {
starrocksDatatype = "char(36)"
+ } else if strings.HasSuffix(dataType, "[]") {
+ starrocksDatatype = fmt.Sprintf("array<%s>",
getDataType(strings.Split(dataType, "[]")[0]))
}
return starrocksDatatype
}