This is an automated email from the ASF dual-hosted git repository.

klesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new 8bc1f1a75 refactor: refactor StatefulApiExtractor to fix busy buffer 
and EOF errors (#8405)
8bc1f1a75 is described below

commit 8bc1f1a7574885bd305953baec1a3da3c1679c2a
Author: Caio Queiroz <[email protected]>
AuthorDate: Tue Apr 29 02:36:06 2025 -0300

    refactor: refactor StatefulApiExtractor to fix busy buffer and EOF errors 
(#8405)
    
    * refactor: api extractor execute function to fetch full raw record at a 
time
    
    * refactor: use pluck to get ids instead cursor
---
 .../pluginhelper/api/api_extractor_stateful.go     | 32 +++++++++++++---------
 1 file changed, 19 insertions(+), 13 deletions(-)

diff --git a/backend/helpers/pluginhelper/api/api_extractor_stateful.go 
b/backend/helpers/pluginhelper/api/api_extractor_stateful.go
index 43a30d501..0e474787f 100644
--- a/backend/helpers/pluginhelper/api/api_extractor_stateful.go
+++ b/backend/helpers/pluginhelper/api/api_extractor_stateful.go
@@ -102,7 +102,9 @@ func (extractor *StatefulApiExtractor[InputType]) Execute() 
errors.Error {
        if !db.HasTable(table) {
                return nil
        }
+
        clauses := []dal.Clause{
+               dal.Select("id"),
                dal.From(table),
                dal.Where("params = ?", params),
                dal.Orderby("id ASC"),
@@ -116,17 +118,20 @@ func (extractor *StatefulApiExtractor[InputType]) 
Execute() errors.Error {
        }
        clauses = append(clauses, dal.Where("created_at < ? ", 
extractor.GetUntil()))
 
+       // first get total count for progress tracking
        count, err := db.Count(clauses...)
        if err != nil {
-               return errors.Default.Wrap(err, "error getting count of 
clauses")
+               return errors.Default.Wrap(err, "error getting count of 
records")
        }
-       cursor, err := db.Cursor(clauses...)
+       logger.Info("get data from %s where params=%s and got %d with clauses 
%+v", table, params, count, clauses)
+
+       // get all IDs
+       var ids []uint64
+       err = db.Pluck("id", &ids, clauses...)
        if err != nil {
-               return errors.Default.Wrap(err, "error running DB query")
+               return errors.Default.Wrap(err, "error getting IDs")
        }
-       logger.Info("get data from %s where params=%s and got %d with clauses 
%+v", table, params, count, clauses)
 
-       defer cursor.Close()
        // batch save divider
        divider := NewBatchSaveDivider(extractor.SubTaskContext, 
extractor.GetBatchSize(), table, params)
        divider.SetIncrementalMode(extractor.IsIncremental())
@@ -134,17 +139,20 @@ func (extractor *StatefulApiExtractor[InputType]) 
Execute() errors.Error {
        // progress
        extractor.SetProgress(0, -1)
        ctx := extractor.GetContext()
-       // iterate all rows
-       for cursor.Next() {
+
+       // process each record individually by ID
+       for _, id := range ids {
                select {
                case <-ctx.Done():
                        return errors.Convert(ctx.Err())
                default:
                }
+
+               // load full record by ID
                row := &RawData{}
-               err = db.Fetch(cursor, row)
+               err := db.First(row, dal.From(table), dal.Where("id = ?", id))
                if err != nil {
-                       return errors.Default.Wrap(err, "error fetching row")
+                       return errors.Default.Wrap(err, "error loading full row 
by ID")
                }
 
                body := new(InputType)
@@ -164,6 +172,7 @@ func (extractor *StatefulApiExtractor[InputType]) Execute() 
errors.Error {
                if err != nil {
                        return errors.Default.Wrap(err, "error calling plugin 
Extract implementation")
                }
+
                for _, result := range results {
                        // get the batch operator for the specific type
                        batch, err := divider.ForType(reflect.TypeOf(result))
@@ -184,16 +193,13 @@ func (extractor *StatefulApiExtractor[InputType]) 
Execute() errors.Error {
                }
                extractor.IncProgress(1)
        }
-       if err := cursor.Err(); err != nil {
-               return errors.Default.Wrap(err, "error occurred during database 
cursor iteration in StatefulApiExtractor")
-       }
 
        // save the last batches
        err = divider.Close()
        if err != nil {
                return err
        }
-       // save the incremantal state
+       // save the incremental state
        return extractor.SubtaskStateManager.Close()
 }
 

Reply via email to