This is an automated email from the ASF dual-hosted git repository.
liaoxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-streamloader.git
The following commit(s) were added to refs/heads/master by this push:
new c0cd939 [fix]avoid label conflict when setting labels in headers.
(#25)
c0cd939 is described below
commit c0cd939bae6dcb06f3aa14e0a8e6cff473c15f5a
Author: Petrichor <[email protected]>
AuthorDate: Mon Feb 17 14:50:39 2025 +0800
[fix]avoid label conflict when setting labels in headers. (#25)
---
build.sh | 2 ++
loader/stream_loader.go | 24 +++++++++++++++++++-----
reader/reader.go | 4 ++--
3 files changed, 23 insertions(+), 7 deletions(-)
diff --git a/build.sh b/build.sh
index f1bc7b9..5bfd021 100755
--- a/build.sh
+++ b/build.sh
@@ -19,6 +19,8 @@
ROOT="$(cd "$(dirname "${BASH_SOURCE[0]}")" &>/dev/null && pwd)"
rm -rf version.go
+# Formatting Go code with gofmt
+find . -name '*.go' -exec gofmt -w {} \;
go generate
go build
echo "Build success. Output: ${ROOT}/doris-streamloader"
diff --git a/loader/stream_loader.go b/loader/stream_loader.go
index 936a190..80840c4 100644
--- a/loader/stream_loader.go
+++ b/loader/stream_loader.go
@@ -18,11 +18,13 @@
package loader
import (
+ "doris-streamloader/report"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
+ "strconv"
"strings"
"sync"
"sync/atomic"
@@ -31,7 +33,6 @@ import (
"github.com/pierrec/lz4/v4"
log "github.com/sirupsen/logrus"
- "doris-streamloader/report"
)
type StreamLoadOption struct {
@@ -143,7 +144,7 @@ func (s *StreamLoad) createUrl() string {
}
// stream load create http request with string data
-func (s *StreamLoad) createRequest(url string, reader io.Reader) (req
*http.Request, err error) {
+func (s *StreamLoad) createRequest(url string, reader io.Reader, workerIndex
int, taskIndex int) (req *http.Request, err error) {
req, err = http.NewRequest("PUT", url, reader)
if err != nil {
return
@@ -155,6 +156,19 @@ func (s *StreamLoad) createRequest(url string, reader
io.Reader) (req *http.Requ
req.Header.Set("Content-Type", "text/plain")
for k, v := range s.headers {
req.Header.Set(k, v)
+ // If a label has already been set in the headers, to prevent
conflicts,
+ //generate a unique label by combining the original label,
worker index, and task index.
+ if k == "label" {
+ var builder strings.Builder
+ builder.WriteString(v)
+ builder.WriteString("_")
+ builder.WriteString(strconv.Itoa(workerIndex))
+ builder.WriteString("_")
+ builder.WriteString(strconv.Itoa(taskIndex))
+
+ req.Header.Set("label", builder.String())
+ }
+
}
if s.Compress {
@@ -228,10 +242,10 @@ func (s *StreamLoad) readData(isEOS *atomic.Bool,
rawWriter *io.PipeWriter, read
}
}
-func (s *StreamLoad) send(url string, reader io.Reader) (*http.Response,
error) {
+func (s *StreamLoad) send(url string, reader io.Reader, workerIndex int,
taskIndex int) (*http.Response, error) {
realUrl := url
for {
- req, err := s.createRequest(realUrl, reader)
+ req, err := s.createRequest(realUrl, reader, workerIndex,
taskIndex)
if err != nil {
if req == nil {
return nil, err
@@ -347,7 +361,7 @@ func (s *StreamLoad) executeGetAndSend(maxRowsPerTask int,
maxBytesPerTask int,
workerIndex: workerIndex,
taskIndex: taskIndex,
})
- if resp, err := s.send(url, NopCloser(pr)); err != nil {
+ if resp, err := s.send(url, NopCloser(pr), workerIndex,
taskIndex); err != nil {
s.handleSendError(workerIndex, taskIndex)
log.Errorf("Send error, resp: %v error message: %v",
resp, err)
return
diff --git a/reader/reader.go b/reader/reader.go
index 634b9f0..2dcd2a5 100644
--- a/reader/reader.go
+++ b/reader/reader.go
@@ -19,6 +19,8 @@ package file
import (
"bufio"
+ "doris-streamloader/loader"
+ "doris-streamloader/report"
"io"
"os"
"path/filepath"
@@ -28,8 +30,6 @@ import (
"time"
log "github.com/sirupsen/logrus"
- report "doris-streamloader/report"
- loader "doris-streamloader/loader"
)
type FileReader struct {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]