This is an automated email from the ASF dual-hosted git repository.
klesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/main by this push:
new 8bc1f1a75 refactor: refactor StatefulApiExtractor to fix busy buffer
and EOF errors (#8405)
8bc1f1a75 is described below
commit 8bc1f1a7574885bd305953baec1a3da3c1679c2a
Author: Caio Queiroz <[email protected]>
AuthorDate: Tue Apr 29 02:36:06 2025 -0300
refactor: refactor StatefulApiExtractor to fix busy buffer and EOF errors
(#8405)
* refactor: api extractor execute function to fetch full raw record at a
time
* refactor: use pluck to get ids instead cursor
---
.../pluginhelper/api/api_extractor_stateful.go | 32 +++++++++++++---------
1 file changed, 19 insertions(+), 13 deletions(-)
diff --git a/backend/helpers/pluginhelper/api/api_extractor_stateful.go
b/backend/helpers/pluginhelper/api/api_extractor_stateful.go
index 43a30d501..0e474787f 100644
--- a/backend/helpers/pluginhelper/api/api_extractor_stateful.go
+++ b/backend/helpers/pluginhelper/api/api_extractor_stateful.go
@@ -102,7 +102,9 @@ func (extractor *StatefulApiExtractor[InputType]) Execute()
errors.Error {
if !db.HasTable(table) {
return nil
}
+
clauses := []dal.Clause{
+ dal.Select("id"),
dal.From(table),
dal.Where("params = ?", params),
dal.Orderby("id ASC"),
@@ -116,17 +118,20 @@ func (extractor *StatefulApiExtractor[InputType])
Execute() errors.Error {
}
clauses = append(clauses, dal.Where("created_at < ? ",
extractor.GetUntil()))
+ // first get total count for progress tracking
count, err := db.Count(clauses...)
if err != nil {
- return errors.Default.Wrap(err, "error getting count of
clauses")
+ return errors.Default.Wrap(err, "error getting count of
records")
}
- cursor, err := db.Cursor(clauses...)
+ logger.Info("get data from %s where params=%s and got %d with clauses
%+v", table, params, count, clauses)
+
+ // get all IDs
+ var ids []uint64
+ err = db.Pluck("id", &ids, clauses...)
if err != nil {
- return errors.Default.Wrap(err, "error running DB query")
+ return errors.Default.Wrap(err, "error getting IDs")
}
- logger.Info("get data from %s where params=%s and got %d with clauses
%+v", table, params, count, clauses)
- defer cursor.Close()
// batch save divider
divider := NewBatchSaveDivider(extractor.SubTaskContext,
extractor.GetBatchSize(), table, params)
divider.SetIncrementalMode(extractor.IsIncremental())
@@ -134,17 +139,20 @@ func (extractor *StatefulApiExtractor[InputType])
Execute() errors.Error {
// progress
extractor.SetProgress(0, -1)
ctx := extractor.GetContext()
- // iterate all rows
- for cursor.Next() {
+
+ // process each record individually by ID
+ for _, id := range ids {
select {
case <-ctx.Done():
return errors.Convert(ctx.Err())
default:
}
+
+ // load full record by ID
row := &RawData{}
- err = db.Fetch(cursor, row)
+ err := db.First(row, dal.From(table), dal.Where("id = ?", id))
if err != nil {
- return errors.Default.Wrap(err, "error fetching row")
+ return errors.Default.Wrap(err, "error loading full row
by ID")
}
body := new(InputType)
@@ -164,6 +172,7 @@ func (extractor *StatefulApiExtractor[InputType]) Execute()
errors.Error {
if err != nil {
return errors.Default.Wrap(err, "error calling plugin
Extract implementation")
}
+
for _, result := range results {
// get the batch operator for the specific type
batch, err := divider.ForType(reflect.TypeOf(result))
@@ -184,16 +193,13 @@ func (extractor *StatefulApiExtractor[InputType])
Execute() errors.Error {
}
extractor.IncProgress(1)
}
- if err := cursor.Err(); err != nil {
- return errors.Default.Wrap(err, "error occurred during database
cursor iteration in StatefulApiExtractor")
- }
// save the last batches
err = divider.Close()
if err != nil {
return err
}
- // save the incremantal state
+ // save the incremental state
return extractor.SubtaskStateManager.Close()
}