This is an automated email from the ASF dual-hosted git repository.
warren pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/main by this push:
new e8877a41 feat: add be_host and fix hash columns
e8877a41 is described below
commit e8877a41c5cda7db8d181ef6926c8a605369045e
Author: Jinlong Peng <[email protected]>
AuthorDate: Fri Aug 19 16:08:22 2022 +0800
feat: add be_host and fix hash columns
---
.../src/data/pipeline-config-samples/starrocks.js | 1 +
plugins/starrocks/starrocks.go | 7 +++-
plugins/starrocks/task_data.go | 5 ++-
plugins/starrocks/tasks.go | 46 +++++++++++++++-------
4 files changed, 42 insertions(+), 17 deletions(-)
diff --git a/config-ui/src/data/pipeline-config-samples/starrocks.js
b/config-ui/src/data/pipeline-config-samples/starrocks.js
index aadb152f..40631d2b 100644
--- a/config-ui/src/data/pipeline-config-samples/starrocks.js
+++ b/config-ui/src/data/pipeline-config-samples/starrocks.js
@@ -25,6 +25,7 @@ const starRocksConfig = [
user: 'root',
password: '',
database: 'lake',
+ be_host: '',
be_port: 8040,
tables: ['_tool_.*'], // support regexp
batch_size: 10000,
diff --git a/plugins/starrocks/starrocks.go b/plugins/starrocks/starrocks.go
index f22903de..eb3f6109 100644
--- a/plugins/starrocks/starrocks.go
+++ b/plugins/starrocks/starrocks.go
@@ -6,7 +6,7 @@ The ASF licenses this file to You under the Apache License,
Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
+ http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
@@ -37,6 +37,9 @@ func (s StarRocks) PrepareTaskData(taskCtx core.TaskContext,
options map[string]
if err != nil {
return nil, err
}
+ if op.BeHost == "" {
+ op.BeHost = op.Host
+ }
return &op, nil
}
@@ -61,6 +64,7 @@ func main() {
_ = cmd.MarkFlagRequired("port")
port := cmd.Flags().StringP("port", "p", "", "StarRocks port")
_ = cmd.MarkFlagRequired("port")
+ beHost := cmd.Flags().StringP("be_host", "BH", "", "StarRocks be host")
bePort := cmd.Flags().StringP("be_port", "BP", "", "StarRocks be port")
_ = cmd.MarkFlagRequired("user")
user := cmd.Flags().StringP("user", "u", "", "StarRocks user")
@@ -81,6 +85,7 @@ func main() {
"user": user,
"password": password,
"database": database,
+ "be_host": beHost,
"be_port": bePort,
"tables": tables,
"batch_size": batchSize,
diff --git a/plugins/starrocks/task_data.go b/plugins/starrocks/task_data.go
index 00c0a982..76f4b1b4 100644
--- a/plugins/starrocks/task_data.go
+++ b/plugins/starrocks/task_data.go
@@ -6,7 +6,7 @@ The ASF licenses this file to You under the Apache License,
Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
+ http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
@@ -22,7 +22,8 @@ type StarRocksConfig struct {
User string
Password string
Database string
- BePort int `mapstructure:"be_port"`
+ BeHost string `mapstructure:"be_host"`
+ BePort int `mapstructure:"be_port"`
Tables []string
BatchSize int `mapstructure:"batch_size"`
DomainLayer string `mapstructure:"domain_layer"`
diff --git a/plugins/starrocks/tasks.go b/plugins/starrocks/tasks.go
index 4f776f65..3bf9cfd3 100644
--- a/plugins/starrocks/tasks.go
+++ b/plugins/starrocks/tasks.go
@@ -21,8 +21,9 @@ import (
"database/sql"
"encoding/json"
"fmt"
- "io/ioutil"
+ "io"
"net/http"
+ "net/url"
"regexp"
"strings"
@@ -96,7 +97,7 @@ func createTable(starrocks *sql.DB, db dal.Dal,
starrocksTable string, table str
if err != nil {
return err
}
- var pks string
+ var pks []string
var columns []string
firstcm := ""
for _, cm := range columeMetas {
@@ -109,22 +110,19 @@ func createTable(starrocks *sql.DB, db dal.Dal,
starrocksTable string, table str
columns = append(columns, column)
isPrimaryKey, ok := cm.PrimaryKey()
if isPrimaryKey && ok {
- if pks != "" {
- pks += ","
- }
- pks += name
+ pks = append(pks, fmt.Sprintf("`%s`", name))
}
if firstcm == "" {
- firstcm = name
+ firstcm = fmt.Sprintf("`%s`", name)
}
}
- if pks == "" {
- pks = firstcm
+ if len(pks) == 0 {
+ pks = append(pks, firstcm)
}
if extra == "" {
- extra = fmt.Sprintf(`engine=olap distributed by hash(%s)
properties("replication_num" = "1")`, pks)
+ extra = fmt.Sprintf(`engine=olap distributed by hash(%s)
properties("replication_num" = "1")`, strings.Join(pks, ", "))
}
tableSql := fmt.Sprintf("create table if not exists `%s` ( %s ) %s",
starrocksTable, strings.Join(columns, ","), extra)
c.GetLogger().Info(tableSql)
@@ -172,7 +170,7 @@ func loadData(starrocks *sql.DB, c core.SubTaskContext,
starrocksTable string, t
break
}
// insert data to tmp table
- url := fmt.Sprintf("http://%s:%d/api/%s/%s/_stream_load",
config.Host, config.BePort, config.Database, starrocksTmpTable)
+ loadURL := fmt.Sprintf("http://%s:%d/api/%s/%s/_stream_load",
config.BeHost, config.BePort, config.Database, starrocksTmpTable)
headers := map[string]string{
"format": "json",
"strip_outer_array": "true",
@@ -183,8 +181,12 @@ func loadData(starrocks *sql.DB, c core.SubTaskContext,
starrocksTable string, t
if err != nil {
return err
}
- client := http.Client{}
- req, err := http.NewRequest(http.MethodPut, url,
bytes.NewBuffer(jsonData))
+ client := http.Client{
+ CheckRedirect: func(req *http.Request, via
[]*http.Request) error {
+ return http.ErrUseLastResponse
+ },
+ }
+ req, err := http.NewRequest(http.MethodPut, loadURL,
bytes.NewBuffer(jsonData))
if err != nil {
return err
}
@@ -193,10 +195,26 @@ func loadData(starrocks *sql.DB, c core.SubTaskContext,
starrocksTable string, t
req.Header.Set(k, v)
}
resp, err := client.Do(req)
+ if err != nil && err != http.ErrUseLastResponse {
+ return err
+ }
+ if err == http.ErrUseLastResponse {
+ var location *url.URL
+ location, err = resp.Location()
+ req, err = http.NewRequest(http.MethodPut,
location.String(), bytes.NewBuffer(jsonData))
+ if err != nil {
+ return err
+ }
+ req.SetBasicAuth(config.User, config.Password)
+ for k, v := range headers {
+ req.Header.Set(k, v)
+ }
+ resp, err = client.Do(req)
+ }
if err != nil {
return err
}
- b, err := ioutil.ReadAll(resp.Body)
+ b, err := io.ReadAll(resp.Body)
if err != nil {
return err
}