This is an automated email from the ASF dual-hosted git repository.

hez pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new d0cd663be [fix-4772]: Python progress update crashes Go DevLake server 
(#4815)
d0cd663be is described below

commit d0cd663bea84c033d35af9febac02991b1147a3f
Author: Keon Amini <[email protected]>
AuthorDate: Thu Mar 30 20:04:40 2023 -0500

    [fix-4772]: Python progress update crashes Go DevLake server (#4815)
    
    * fix: Fix for panic in Go if Python returns a remote progress via FdOut
    
    * fix: Workaround for Pycharm debugger getting stuck
    
    * test: Have Python fakeplugin return more data so we can test this fix
    
    * fix: Progress counter logic fix
---
 backend/core/utils/ipc.go                          |  8 +++++--
 backend/go.mod                                     |  2 +-
 backend/python/pydevlake/pydevlake/__init__.py     |  6 +++++
 .../python/pydevlake/pydevlake/helpers/debugger.py |  6 ++---
 backend/python/pydevlake/pydevlake/ipc.py          |  3 ---
 backend/python/pydevlake/pydevlake/subtasks.py     | 20 ++++++++++++-----
 backend/python/test/fakeplugin/fakeplugin/main.py  | 26 +++++++++++++---------
 backend/server/services/remote/bridge/bridge.go    |  2 +-
 8 files changed, 46 insertions(+), 27 deletions(-)

diff --git a/backend/core/utils/ipc.go b/backend/core/utils/ipc.go
index 241fbd77a..4a0f814bf 100644
--- a/backend/core/utils/ipc.go
+++ b/backend/core/utils/ipc.go
@@ -243,7 +243,9 @@ func scanOutputPipe(pipe io.ReadCloser, wg *sync.WaitGroup, 
onReceive func([]byt
                        src := scanner.Bytes()
                        data := make([]byte, len(src))
                        copy(data, src)
-                       onReceive(data)
+                       if onReceive != nil {
+                               onReceive(data)
+                       }
                        outboundChannel <- responseCreator(data)
                }
                wg.Done()
@@ -259,7 +261,9 @@ func scanErrorPipe(pipe io.ReadCloser, onReceive 
func([]byte), outboundChannel c
                        src := scanner.Bytes()
                        data := make([]byte, len(src))
                        copy(data, src)
-                       onReceive(data)
+                       if onReceive != nil {
+                               onReceive(data)
+                       }
                        outboundChannel <- &ProcessResponse{stderr: data}
                        _, _ = remoteErrorMsg.Write(src)
                        _, _ = remoteErrorMsg.WriteString("\n")
diff --git a/backend/go.mod b/backend/go.mod
index 38ddbd22b..95370a7a4 100644
--- a/backend/go.mod
+++ b/backend/go.mod
@@ -68,7 +68,7 @@ require (
        github.com/go-openapi/swag v0.21.1 // indirect
        github.com/go-playground/locales v0.14.0 // indirect
        github.com/go-playground/universal-translator v0.18.0 // indirect
-       github.com/go-sql-driver/mysql v1.6.0
+       github.com/go-sql-driver/mysql v1.6.0 // indirect
        github.com/gogo/googleapis v1.4.1 // indirect
        github.com/gogo/protobuf v1.3.2 // indirect
        github.com/gogo/status v1.1.0 // indirect
diff --git a/backend/python/pydevlake/pydevlake/__init__.py 
b/backend/python/pydevlake/pydevlake/__init__.py
index 5374e7bbe..237c345d5 100644
--- a/backend/python/pydevlake/pydevlake/__init__.py
+++ b/backend/python/pydevlake/pydevlake/__init__.py
@@ -22,3 +22,9 @@ from .message import RemoteScopeGroup
 from .plugin import Plugin, ScopeTxRulePair
 from .stream import DomainType, Stream, Substream
 from .context import Context
+
+# the debugger hangs on startup during plugin registration (reason unknown), 
hence this workaround
+import sys
+if not sys.argv.__contains__('startup'):
+    from pydevlake.helpers import debugger
+    debugger.init()
diff --git a/backend/python/pydevlake/pydevlake/helpers/debugger.py 
b/backend/python/pydevlake/pydevlake/helpers/debugger.py
index 78d8666b9..528528f39 100644
--- a/backend/python/pydevlake/pydevlake/helpers/debugger.py
+++ b/backend/python/pydevlake/pydevlake/helpers/debugger.py
@@ -18,7 +18,7 @@ import os
 from pydevlake import logger
 
 
-def __start__():
+def init():
     debugger = os.getenv("USE_PYTHON_DEBUGGER", default="").lower()
     if debugger == "":
         return
@@ -32,12 +32,10 @@ def __start__():
             import pydevd_pycharm as pydevd
             try:
                 pydevd.settrace(host=host, port=port, suspend=False, 
stdoutToServer=True, stderrToServer=True)
+                logger.info("Pycharm remote debugger successfully connected")
             except TimeoutError as e:
                 logger.error(f"Failed to connect to pycharm debugger on 
{host}:{port}. Make sure it is running")
         except ImportError as e:
             logger.error("Pycharm debugger library is not installed")
     else:
         logger.error(f"Unsupported Python debugger specified: {debugger}")
-
-
-__start__()
diff --git a/backend/python/pydevlake/pydevlake/ipc.py 
b/backend/python/pydevlake/pydevlake/ipc.py
index f28c3af67..b75895ce1 100644
--- a/backend/python/pydevlake/pydevlake/ipc.py
+++ b/backend/python/pydevlake/pydevlake/ipc.py
@@ -24,9 +24,6 @@ from pydevlake.message import Message
 
 
 def plugin_method(func):
-    # keep this to enable debugging, and don't move this elsewhere or it can 
cause crashes if it gets called during the bootstrap process.
-    # noinspection PyUnresolvedReferences
-    from pydevlake.helpers import debugger
 
     def open_send_channel() -> TextIO:
         fd = 3
diff --git a/backend/python/pydevlake/pydevlake/subtasks.py 
b/backend/python/pydevlake/pydevlake/subtasks.py
index d7f1fe59b..15b84e92f 100644
--- a/backend/python/pydevlake/pydevlake/subtasks.py
+++ b/backend/python/pydevlake/pydevlake/subtasks.py
@@ -55,20 +55,28 @@ class Subtask:
                 state = dict()
 
             try:
-                for i, (data, state) in enumerate(self.fetch(state, session, 
ctx)):
+                records = self.fetch(state, session, ctx)
+                progress = last_progress = 0
+                for data, state in records:
+                    progress += 1
                     self.process(data, session, ctx)
-
-                    if i % sync_point_interval == 0 and i != 0:
+                    if progress % sync_point_interval == 0:
                         # Save current state
                         subtask_run.state = json.dumps(state)
                         session.merge(subtask_run)
                         session.commit()
-
                         # Send progress
                         yield RemoteProgress(
                             increment=sync_point_interval,
-                            current=i
+                            current=progress
                         )
+                        last_progress = progress
+                # Send final progress
+                if progress != last_progress:
+                    yield RemoteProgress(
+                        increment=progress-last_progress,
+                        current=progress
+                    )
             except Exception as e:
                 logger.error(f'{type(e).__name__}: {e}')
                 raise e
@@ -99,7 +107,7 @@ class Subtask:
         pass
 
     @abstractmethod
-    def process(self, data: object, session: Session):
+    def process(self, data: object, session: Session, ctx: Context):
         """
         Called for all data entries returned by `fetch`.
         """
diff --git a/backend/python/test/fakeplugin/fakeplugin/main.py 
b/backend/python/test/fakeplugin/fakeplugin/main.py
index f1903c91f..ac6ac5308 100644
--- a/backend/python/test/fakeplugin/fakeplugin/main.py
+++ b/backend/python/test/fakeplugin/fakeplugin/main.py
@@ -23,7 +23,6 @@ from sqlmodel import Field
 from pydevlake import Plugin, Connection, TransformationRule, Stream, 
ToolModel, ToolScope, RemoteScopeGroup, DomainType
 from pydevlake.domain_layer.devops import CicdScope, CICDPipeline, CICDStatus, 
CICDResult, CICDType
 
-
 VALID_TOKEN = "this_is_a_valid_token"
 
 
@@ -40,18 +39,12 @@ class FakePipeline(ToolModel, table=True):
     state: State
 
 
-class FakeStream(Stream):
+class FakePipelineStream(Stream):
     tool_model = FakePipeline
     domain_types = [DomainType.CICD]
 
-    fake_pipelines = [
-        FakePipeline(id=1, state=FakePipeline.State.SUCCESS, 
started_at=datetime(2023, 1, 10, 11, 0, 0), finished_at=datetime(2023, 1, 10, 
11, 3, 0)),
-        FakePipeline(id=2, state=FakePipeline.State.FAILURE, 
started_at=datetime(2023, 1, 10, 12, 0, 0), finished_at=datetime(2023, 1, 10, 
12, 1, 30)),
-        FakePipeline(id=1, state=FakePipeline.State.PENDING),
-    ]
-
     def collect(self, state, context):
-        for p in self.fake_pipelines:
+        for p in self.generate_fake_pipelines():
             yield json.loads(p.json()), {}
 
     def convert(self, pipeline: FakePipeline, ctx):
@@ -90,6 +83,19 @@ class FakeStream(Stream):
             return (pipeline.finished_at - pipeline.started_at).seconds
         return None
 
+    @classmethod
+    def generate_fake_pipelines(cls) -> list[FakePipeline]:
+        states = [FakePipeline.State.SUCCESS, FakePipeline.State.FAILURE, 
FakePipeline.State.PENDING]
+        fake_pipelines = []
+        for i in range(250):
+            fake_pipelines.append(FakePipeline(
+                id=i,
+                state=states[i % len(states)],
+                started_at=datetime(2023, 1, 10, 11, 0, 0, microsecond=i),
+                finished_at=datetime(2023, 1, 10, 11, 3, 0, microsecond=i),
+            ))
+        return fake_pipelines
+
 
 class FakeConnection(Connection):
     token: str
@@ -149,7 +155,7 @@ class FakePlugin(Plugin):
     @property
     def streams(self):
         return [
-            FakeStream
+            FakePipelineStream
         ]
 
 
diff --git a/backend/server/services/remote/bridge/bridge.go 
b/backend/server/services/remote/bridge/bridge.go
index d526494d8..727ef7e2c 100644
--- a/backend/server/services/remote/bridge/bridge.go
+++ b/backend/server/services/remote/bridge/bridge.go
@@ -53,7 +53,7 @@ func (b *Bridge) RemoteSubtaskEntrypointHandler(subtaskMeta 
models.SubtaskMeta)
                        if err != nil {
                                return err
                        }
-                       if progress.Current != 0 {
+                       if progress.Total != 0 {
                                ctx.SetProgress(progress.Current, 
progress.Total)
                        } else if progress.Increment != 0 {
                                ctx.IncProgress(progress.Increment)

Reply via email to