xiaokang commented on code in PR #47691:
URL: https://github.com/apache/doris/pull/47691#discussion_r1966950088
##########
extension/beats/doris/client.go:
##########
@@ -184,124 +194,320 @@ func (client *client) Close() error {
}
func (client *client) String() string {
- str := fmt.Sprintf("doris{%s, %s, %s}", client.url, client.labelPrefix,
client.headers)
+ str := fmt.Sprintf("doris{%s, %s, %s}", client.url("{table}"),
client.labelPrefix, client.headers)
if _, ok := client.headers["Authorization"]; ok {
return strings.Replace(str,
"Authorization:"+client.headers["Authorization"], "Authorization:Basic ******",
1)
}
return str
}
-func (client *client) Publish(_ context.Context, batch publisher.Batch) error {
+func (client *client) url(table string) string {
+ return fmt.Sprintf("%s/%s/%s/_stream_load", client.urlPrefix,
client.db, table)
+}
+
+func (client *client) label(table string) string {
+ return fmt.Sprintf("%s_%s_%s_%d_%s", client.labelPrefix,
client.database, table, time.Now().UnixMilli(), uuid.New())
+}
+
+// Publish sends events to doris.
+// batch.Events() are grouped by table first (tableEvents).
+// For each tableEvents, call the http stream load api to send the tableEvents
to doris.
+// If a tableEvents returns an error, add a barrier to the first event of the
tableEvents.
+// A barrier contains a table, a stream load label, and the length of the
tableEvents.
+// Add all failed tableEvents to the retryEvents.
+// So if the first event in the batch.Events() has a barrier, it means that
this is a retry.
+// In this case, we will split the batch.Events() to some tableEvents by the
barrier events
+// and send each tableEvents to doris again reusing the label in the barrier.
+func (client *client) Publish(ctx context.Context, batch publisher.Batch)
error {
events := batch.Events()
length := len(events)
client.logger.Debugf("Received events: %d", length)
- label := fmt.Sprintf("%s_%s_%s_%d_%s", client.labelPrefix,
client.database, client.table, time.Now().UnixMilli(), uuid.New())
- rest, err := client.publishEvents(label, events)
+ tableEventsMap := client.makeTableEventsMap(ctx, events)
+ rest, err := client.publishEvents(tableEventsMap)
if len(rest) == 0 {
batch.ACK()
- client.logger.Debugf("Success send %d events", length)
} else {
- client.observer.Failed(length)
batch.RetryEvents(rest)
- client.logger.Warnf("Retry send %d events", length)
+ client.logger.Warnf("Retry send %d events", len(rest))
}
return err
}
-func (client *client) publishEvents(lable string, events []publisher.Event)
([]publisher.Event, error) {
+const nilTable = ""
+
+type Events struct {
+ Label string
+ Events []publisher.Event
+
+ // used in publishEvents
+ serialization string
+ dropped int64
+ request *http.Request
+ response *http.Response
+ err error
+}
+
+func (client *client) makeTableEventsMap(_ context.Context, events
[]publisher.Event) map[string]*Events {
+ tableEventsMap := make(map[string]*Events)
+ if len(events) == 0 {
+ return tableEventsMap
+ }
+
+ barrier, err := getBarrierFromEvent(&events[0])
+ if err == nil { // retry
+ if client.tableSelector.Sel.IsConst() { // table is const
Review Comment:
ifelse in retry can be combined.
##########
extension/beats/doris/client.go:
##########
@@ -184,124 +194,320 @@ func (client *client) Close() error {
}
func (client *client) String() string {
- str := fmt.Sprintf("doris{%s, %s, %s}", client.url, client.labelPrefix,
client.headers)
+ str := fmt.Sprintf("doris{%s, %s, %s}", client.url("{table}"),
client.labelPrefix, client.headers)
if _, ok := client.headers["Authorization"]; ok {
return strings.Replace(str,
"Authorization:"+client.headers["Authorization"], "Authorization:Basic ******",
1)
}
return str
}
-func (client *client) Publish(_ context.Context, batch publisher.Batch) error {
+func (client *client) url(table string) string {
+ return fmt.Sprintf("%s/%s/%s/_stream_load", client.urlPrefix,
client.db, table)
+}
+
+func (client *client) label(table string) string {
+ return fmt.Sprintf("%s_%s_%s_%d_%s", client.labelPrefix,
client.database, table, time.Now().UnixMilli(), uuid.New())
+}
+
+// Publish sends events to doris.
+// batch.Events() are grouped by table first (tableEvents).
+// For each tableEvents, call the http stream load api to send the tableEvents
to doris.
+// If a tableEvents returns an error, add a barrier to the first event of the
tableEvents.
+// A barrier contains a table, a stream load label, and the length of the
tableEvents.
+// Add all failed tableEvents to the retryEvents.
+// So if the first event in the batch.Events() has a barrier, it means that
this is a retry.
+// In this case, we will split the batch.Events() to some tableEvents by the
barrier events
+// and send each tableEvents to doris again reusing the label in the barrier.
+func (client *client) Publish(ctx context.Context, batch publisher.Batch)
error {
events := batch.Events()
length := len(events)
client.logger.Debugf("Received events: %d", length)
- label := fmt.Sprintf("%s_%s_%s_%d_%s", client.labelPrefix,
client.database, client.table, time.Now().UnixMilli(), uuid.New())
- rest, err := client.publishEvents(label, events)
+ tableEventsMap := client.makeTableEventsMap(ctx, events)
+ rest, err := client.publishEvents(tableEventsMap)
if len(rest) == 0 {
batch.ACK()
- client.logger.Debugf("Success send %d events", length)
} else {
- client.observer.Failed(length)
batch.RetryEvents(rest)
- client.logger.Warnf("Retry send %d events", length)
+ client.logger.Warnf("Retry send %d events", len(rest))
}
return err
}
-func (client *client) publishEvents(lable string, events []publisher.Event)
([]publisher.Event, error) {
+const nilTable = ""
+
+type Events struct {
+ Label string
+ Events []publisher.Event
+
+ // used in publishEvents
+ serialization string
+ dropped int64
+ request *http.Request
+ response *http.Response
+ err error
+}
+
+func (client *client) makeTableEventsMap(_ context.Context, events
[]publisher.Event) map[string]*Events {
+ tableEventsMap := make(map[string]*Events)
+ if len(events) == 0 {
+ return tableEventsMap
+ }
+
+ barrier, err := getBarrierFromEvent(&events[0])
+ if err == nil { // retry
Review Comment:
you should check `if barrier != nil`
##########
extension/beats/doris/client.go:
##########
@@ -39,14 +41,16 @@ import (
)
type client struct {
- url string
+ urlPrefix string
+ db string
Review Comment:
duplicate with database
##########
extension/beats/doris/client.go:
##########
@@ -184,124 +194,320 @@ func (client *client) Close() error {
}
func (client *client) String() string {
- str := fmt.Sprintf("doris{%s, %s, %s}", client.url, client.labelPrefix,
client.headers)
+ str := fmt.Sprintf("doris{%s, %s, %s}", client.url("{table}"),
client.labelPrefix, client.headers)
if _, ok := client.headers["Authorization"]; ok {
return strings.Replace(str,
"Authorization:"+client.headers["Authorization"], "Authorization:Basic ******",
1)
}
return str
}
-func (client *client) Publish(_ context.Context, batch publisher.Batch) error {
+func (client *client) url(table string) string {
+ return fmt.Sprintf("%s/%s/%s/_stream_load", client.urlPrefix,
client.db, table)
+}
+
+func (client *client) label(table string) string {
+ return fmt.Sprintf("%s_%s_%s_%d_%s", client.labelPrefix,
client.database, table, time.Now().UnixMilli(), uuid.New())
+}
+
+// Publish sends events to doris.
+// batch.Events() are grouped by table first (tableEvents).
+// For each tableEvents, call the http stream load api to send the tableEvents
to doris.
+// If a tableEvents returns an error, add a barrier to the first event of the
tableEvents.
+// A barrier contains a table, a stream load label, and the length of the
tableEvents.
+// Add all failed tableEvents to the retryEvents.
+// So if the first event in the batch.Events() has a barrier, it means that
this is a retry.
+// In this case, we will split the batch.Events() to some tableEvents by the
barrier events
+// and send each tableEvents to doris again reusing the label in the barrier.
+func (client *client) Publish(ctx context.Context, batch publisher.Batch)
error {
events := batch.Events()
length := len(events)
client.logger.Debugf("Received events: %d", length)
- label := fmt.Sprintf("%s_%s_%s_%d_%s", client.labelPrefix,
client.database, client.table, time.Now().UnixMilli(), uuid.New())
- rest, err := client.publishEvents(label, events)
+ tableEventsMap := client.makeTableEventsMap(ctx, events)
+ rest, err := client.publishEvents(tableEventsMap)
if len(rest) == 0 {
batch.ACK()
- client.logger.Debugf("Success send %d events", length)
} else {
- client.observer.Failed(length)
batch.RetryEvents(rest)
- client.logger.Warnf("Retry send %d events", length)
+ client.logger.Warnf("Retry send %d events", len(rest))
}
return err
}
-func (client *client) publishEvents(lable string, events []publisher.Event)
([]publisher.Event, error) {
+const nilTable = ""
+
+type Events struct {
+ Label string
+ Events []publisher.Event
+
+ // used in publishEvents
+ serialization string
+ dropped int64
+ request *http.Request
+ response *http.Response
+ err error
+}
+
+func (client *client) makeTableEventsMap(_ context.Context, events
[]publisher.Event) map[string]*Events {
+ tableEventsMap := make(map[string]*Events)
+ if len(events) == 0 {
+ return tableEventsMap
+ }
+
+ barrier, err := getBarrierFromEvent(&events[0])
+ if err == nil { // retry
Review Comment:
change the branch order as follows to be more natural.
```
if barrier == nil { // first time
} else { // retry
}
```
##########
extension/beats/doris/client.go:
##########
@@ -184,124 +194,320 @@ func (client *client) Close() error {
}
func (client *client) String() string {
- str := fmt.Sprintf("doris{%s, %s, %s}", client.url, client.labelPrefix,
client.headers)
+ str := fmt.Sprintf("doris{%s, %s, %s}", client.url("{table}"),
client.labelPrefix, client.headers)
if _, ok := client.headers["Authorization"]; ok {
return strings.Replace(str,
"Authorization:"+client.headers["Authorization"], "Authorization:Basic ******",
1)
}
return str
}
-func (client *client) Publish(_ context.Context, batch publisher.Batch) error {
+func (client *client) url(table string) string {
+ return fmt.Sprintf("%s/%s/%s/_stream_load", client.urlPrefix,
client.db, table)
+}
+
+func (client *client) label(table string) string {
+ return fmt.Sprintf("%s_%s_%s_%d_%s", client.labelPrefix,
client.database, table, time.Now().UnixMilli(), uuid.New())
+}
+
+// Publish sends events to doris.
+// batch.Events() are grouped by table first (tableEvents).
+// For each tableEvents, call the http stream load api to send the tableEvents
to doris.
+// If a tableEvents returns an error, add a barrier to the first event of the
tableEvents.
+// A barrier contains a table, a stream load label, and the length of the
tableEvents.
+// Add all failed tableEvents to the retryEvents.
+// So if the first event in the batch.Events() has a barrier, it means that
this is a retry.
+// In this case, we will split the batch.Events() to some tableEvents by the
barrier events
+// and send each tableEvents to doris again reusing the label in the barrier.
+func (client *client) Publish(ctx context.Context, batch publisher.Batch)
error {
events := batch.Events()
length := len(events)
client.logger.Debugf("Received events: %d", length)
- label := fmt.Sprintf("%s_%s_%s_%d_%s", client.labelPrefix,
client.database, client.table, time.Now().UnixMilli(), uuid.New())
- rest, err := client.publishEvents(label, events)
+ tableEventsMap := client.makeTableEventsMap(ctx, events)
+ rest, err := client.publishEvents(tableEventsMap)
if len(rest) == 0 {
batch.ACK()
- client.logger.Debugf("Success send %d events", length)
} else {
- client.observer.Failed(length)
batch.RetryEvents(rest)
- client.logger.Warnf("Retry send %d events", length)
+ client.logger.Warnf("Retry send %d events", len(rest))
}
return err
}
-func (client *client) publishEvents(lable string, events []publisher.Event)
([]publisher.Event, error) {
+const nilTable = ""
+
+type Events struct {
+ Label string
+ Events []publisher.Event
+
+ // used in publishEvents
+ serialization string
+ dropped int64
+ request *http.Request
+ response *http.Response
+ err error
+}
+
+func (client *client) makeTableEventsMap(_ context.Context, events
[]publisher.Event) map[string]*Events {
+ tableEventsMap := make(map[string]*Events)
+ if len(events) == 0 {
+ return tableEventsMap
+ }
+
+ barrier, err := getBarrierFromEvent(&events[0])
+ if err == nil { // retry
+ if client.tableSelector.Sel.IsConst() { // table is const
+ removeBarrierFromEvent(&events[0])
+ tableEventsMap[barrier.Table] = &Events{
+ Label: barrier.Label,
+ Events: events,
+ }
+ } else { // split events by barrier
+ for start := 0; start < len(events); {
+ barrier, _ :=
getBarrierFromEvent(&events[start])
+ removeBarrierFromEvent(&events[start])
+ end := start + barrier.Length
+
+ tableEventsMap[barrier.Table] = &Events{
+ Label: barrier.Label,
+ Events: events[start:end], // should
not do any append to the array, because here is a slice of the original array
+ }
+
+ start = end
+ }
+ }
+ } else { // first time
+ if client.tableSelector.Sel.IsConst() { // table is const
+ table, _ :=
client.tableSelector.Sel.Select(&events[0].Content)
+ label := client.label(table)
+ tableEventsMap[table] = &Events{
+ Label: label,
+ Events: events,
+ }
+ } else { // select table for each event
+ for _, e := range events {
+ table, err :=
client.tableSelector.Sel.Select(&e.Content)
+ if err != nil {
+ client.logger.Errorf("Failed to select
table: %+v", err)
+ }
+ if table == nilTable {
+ if client.defaultTable == nilTable {
+ client.logger.Warnf("table
format error, the default table is not set, the data will be dropped")
+ } else {
+ table = client.defaultTable
+ client.logger.Warnf("table
format error, use the default table: %s", client.defaultTable)
+ }
+ }
+ _, ok := tableEventsMap[table]
+ if !ok {
+ tableEventsMap[table] = &Events{
+ Label: client.label(table),
+ Events: []publisher.Event{e},
+ }
+ } else {
+ tableEventsMap[table].Events =
append(tableEventsMap[table].Events, e)
+ }
+ }
+ }
+ }
+
+ return tableEventsMap
+}
+
+func (client *client) publishEvents(tableEventsMap map[string]*Events)
([]publisher.Event, error) {
begin := time.Now()
- var logFirstEvent []byte
- var stringBuilder strings.Builder
+ for table, tableEvents := range tableEventsMap {
+ events := tableEvents.Events
+
+ if table == nilTable {
+ client.logger.Errorf("Invalid table for %v events",
len(events))
+ tableEvents.dropped = int64(len(events))
+ tableEvents.err = fmt.Errorf("invalid table for %v
events", len(events))
+ continue
+ }
+
+ var stringBuilder strings.Builder
- dropped := 0
- for i := range events {
- event := &events[i]
- serializedEvent, err := client.codec.Encode(client.beat.Beat,
&event.Content)
+ for i := range events {
+ event := &events[i]
+ serializedEvent, err :=
client.codec.Encode(client.beat.Beat, &event.Content)
- if err != nil {
- if event.Guaranteed() {
- client.logger.Errorf("Failed to serialize the
event: %+v", err)
- } else {
- client.logger.Warnf("Failed to serialize the
event: %+v", err)
+ if err != nil {
+ if event.Guaranteed() {
+ client.logger.Errorf("Failed to
serialize the event: %+v", err)
+ } else {
+ client.logger.Warnf("Failed to
serialize the event: %+v", err)
+ }
+ client.logger.Debugf("Failed event: %v", event)
+
+ tableEvents.dropped++
+ continue
}
- client.logger.Debugf("Failed event: %v", event)
- dropped++
- client.reporter.IncrFailedRows(1)
+ stringBuilder.Write(serializedEvent)
+ stringBuilder.WriteString(client.lineDelimiter)
+ }
+
+ tableEvents.serialization = stringBuilder.String()
Review Comment:
do not clear events to save memory now?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]