This is an automated email from the ASF dual-hosted git repository.

klesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new 685d0cba5 [fix-5689]: Python schema migrations must be fully manual 
(#5746)
685d0cba5 is described below

commit 685d0cba5b9dfcb29b9d4ce3a606ee6024b8ae0e
Author: Keon Amini <[email protected]>
AuthorDate: Tue Aug 1 13:17:38 2023 +0330

    [fix-5689]: Python schema migrations must be fully manual (#5746)
    
    * fix: Python schema migrations now all fully manual
    
    * feat: support ignoring migrations with old versions
    
    * refactor: move DynamicModelInfo into new file to eliminate duplication 
and circular imports
    
    * fix: fix python models
    
    * fix: CreateTable Op now just checks if the table exists
    
    * docs: Updated python readme
    
    * fix: fix bad postgres syntax in python
---
 backend/python/README.md                           |  27 +++-
 .../plugins/azuredevops/azuredevops/migrations.py  | 165 +++++++++++++++++++++
 .../plugins/azuredevops/azuredevops/models.py      |  38 +----
 backend/python/pydevlake/pydevlake/__init__.py     |   2 +-
 backend/python/pydevlake/pydevlake/message.py      |  27 +---
 backend/python/pydevlake/pydevlake/migration.py    |  19 ++-
 backend/python/pydevlake/pydevlake/model.py        |   5 +-
 backend/python/pydevlake/pydevlake/model_info.py   |  41 +++++
 backend/python/pydevlake/pydevlake/plugin.py       |   9 +-
 backend/python/test/fakeplugin/fakeplugin/main.py  |  43 +-----
 .../test/fakeplugin/fakeplugin/migrations.py       |  29 ++++
 .../python/test/fakeplugin/fakeplugin/models.py    |  47 ++++++
 backend/server/services/remote/init.go             |   4 -
 .../server/services/remote/models/conversion.go    |  17 ++-
 backend/server/services/remote/models/migration.go |  79 ++++++----
 .../server/services/remote/models/plugin_remote.go |   2 -
 .../server/services/remote/plugin/plugin_impl.go   |  30 ----
 17 files changed, 405 insertions(+), 179 deletions(-)

diff --git a/backend/python/README.md b/backend/python/README.md
index 912f84d76..f909b7916 100644
--- a/backend/python/README.md
+++ b/backend/python/README.md
@@ -237,23 +237,38 @@ To facilitate or even eliminate extraction, your tool 
models should be close to
 #### Migration of tool models
 
 Tool models, connection, scope and scope config types are stored in the 
DevLake database.
-When you change the definition of one of those types, the database needs to be 
migrated.
-Automatic migration takes care of most modifications, but some changes require 
manual migration. For example, automatic migration never drops columns. Another 
example is adding a column to the primary key of a table, you need to write a 
script that remove the primary key constraint and add a new compound primary 
key.
+When you create or change the definition of one of those types, the database 
needs to be migrated. This requires that
+you write migration code for them. Important: When you write a migration for a 
creation operation, the model must be a SNAPSHOT of the models you're intending
+to migrate; don't directly use the actual model because that may change over 
time. Instead, define a model that is a copy of the main one, and use that in 
the
+migration - this model's code will never change (Hence, it's a snapshot).
+Also, keep in mind, that Python only supports writing schema migrations. If 
your flow requires data migrations as well, at this time, the code needs to
+be written in Go. See [this Go 
package](https://github.com/apache/incubator-devlake/tree/main/backend/server/services/remote/models/migrationscripts)
 for example.
 
 To declare a new migration script, you decorate a function with the 
`migration` decorator. The function name should describe what the script does. 
The `migration` decorator takes a version number that should be a 14 digits 
timestamp in the format `YYYYMMDDhhmmss`. The function takes a 
`MigrationScriptBuilder` as a parameter. This builder exposes methods to 
execute migration operations.
 
 ##### Migration operations
 
-The `MigrationScriptBuilder` exposes the following methods:
+The `MigrationScriptBuilder` exposes several methods. Here we list a few:
 - `execute(sql: str, dialect: Optional[Dialect])`: execute a raw SQL 
statement. The `dialect` parameter is used to execute the SQL statement only if 
the database is of the given dialect. If `dialect` is `None`, the statement is 
executed unconditionally.
 - `drop_column(table: str, column: str)`: drop a column from a table
 - `drop_table(table: str)`: drop a table
 
+Example of creating tables via migrations:
+```python
+
+@migration(20230501000001, name="initialize schemas for Plugin")
+def init_schemas(b: MigrationScriptBuilder):
+    class PluginConnection(Connection):
+        token: SecretStr
+        organization: Optional[str]
+    b.create_tables(PluginConnection)
+```
+
 
 ```python
 from pydevlake.migration import MigrationScriptBuilder, migration, Dialect
 
-@migration(20230524181430)
+@migration(20230524181430, name="add pk to Job table")
 def add_build_id_as_job_primary_key(b: MigrationScriptBuilder):
     table = Job.__tablename__
     b.execute(f'ALTER TABLE {table} DROP PRIMARY KEY', Dialect.MYSQL)
@@ -296,7 +311,9 @@ Most of the time, you will convert a tool model into a 
single domain model, but
 
 The `collect` method takes a `state` dictionary and a context object and 
yields tuples of raw data and new state.
 The last state that the plugin yielded for a given connection will be reused 
during the next collection.
-The plugin can use this `state` to store information necessary to perform 
incremental collection of data.
+The plugin can use this `state` to store information necessary to perform 
incremental collection of data. This operates
+independently of the way Go manages state, and is tracked by the table 
`_pydevlake_subtask_runs`. See [this 
issue](https://github.com/apache/incubator-devlake/issues/4880)
+for a proposed improvement to this feature.
 
 The `extract` method takes a raw data object and returns a tool model.
 This method has a default implementation that populates an instance of the 
`tool_model` class with the raw data.
diff --git a/backend/python/plugins/azuredevops/azuredevops/migrations.py 
b/backend/python/plugins/azuredevops/azuredevops/migrations.py
new file mode 100644
index 000000000..615bb2049
--- /dev/null
+++ b/backend/python/plugins/azuredevops/azuredevops/migrations.py
@@ -0,0 +1,165 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+
+#     http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import datetime
+import re
+from enum import Enum
+from typing import Optional
+
+from pydantic import SecretStr
+
+from pydevlake import ToolModel, Connection, Field
+from pydevlake.migration import migration, Dialect, MigrationScriptBuilder
+from pydevlake.model import Model, ToolTable
+from pydevlake.pipeline_tasks import RefDiffOptions
+
+
+@migration(20230501000001, name="initialize schemas for Azure Devops")
+def init_schemas(b: MigrationScriptBuilder):
+    class AzureDevOpsConnection(Connection):
+        token: SecretStr
+        organization: Optional[str]
+
+    class AzureDevopsTransformationRule(ToolTable, Model):
+        name: str = Field(default="default")
+        refdiff: Optional[RefDiffOptions]
+        deployment_pattern: Optional[re.Pattern]
+        production_pattern: Optional[re.Pattern]
+
+    class GitRepository(ToolModel):
+        id: str = Field(primary_key=True)
+        name: str
+        url: str
+        remote_url: Optional[str]
+        default_branch: Optional[str]
+        project_id: str
+        org_id: str
+        parent_repository_url: Optional[str]
+        provider: Optional[str]
+
+    class GitPullRequest(ToolModel):
+        class PRStatus(Enum):
+            Abandoned = "abandoned"
+            Active = "active"
+            Completed = "completed"
+
+        pull_request_id: int = Field(primary_key=True)
+        description: Optional[str]
+        status: PRStatus
+        created_by_id: str
+        created_by_name: str
+        creation_date: datetime.datetime
+        closed_date: Optional[datetime.datetime]
+        source_commit_sha: str
+        target_commit_sha: str
+        merge_commit_sha: Optional[str]
+        url: Optional[str]
+        type: Optional[str]
+        title: Optional[str]
+        target_ref_name: Optional[str]
+        source_ref_name: Optional[str]
+        fork_repo_id: Optional[str]
+
+    class GitPullRequestCommit(ToolModel):
+        commit_id: str = Field(primary_key=True)
+        pull_request_id: str
+        author_name: str
+        author_email: str
+        author_date: datetime.datetime
+
+    class Build(ToolModel):
+        class BuildStatus(Enum):
+            Cancelling = "cancelling"
+            Completed = "completed"
+            InProgress = "inProgress"
+            NotStarted = "notStarted"
+            Postponed = "postponed"
+
+        class BuildResult(Enum):
+            Canceled = "canceled"
+            Failed = "failed"
+            Non = "none"
+            PartiallySucceeded = "partiallySucceeded"
+            Succeeded = "succeeded"
+
+        id: int = Field(primary_key=True)
+        name: str
+        start_time: Optional[datetime.datetime]
+        finish_time: Optional[datetime.datetime]
+        status: BuildStatus
+        result: Optional[BuildResult]
+        source_branch: str
+        source_version: str
+
+    class Job(ToolModel):
+        class JobState(Enum):
+            Completed = "completed"
+            InProgress = "inProgress"
+            Pending = "pending"
+
+        class JobResult(Enum):
+            Abandoned = "abandoned"
+            Canceled = "canceled"
+            Failed = "failed"
+            Skipped = "skipped"
+            Succeeded = "succeeded"
+            SucceededWithIssues = "succeededWithIssues"
+
+        id: str = Field(primary_key=True)
+        build_id: str
+        name: str
+        start_time: Optional[datetime.datetime]
+        finish_time: Optional[datetime.datetime]
+        state: JobState
+        result: Optional[JobResult]
+
+    b.create_tables(
+        AzureDevOpsConnection,
+        AzureDevopsTransformationRule,
+        GitRepository,
+        GitPullRequestCommit,
+        GitPullRequest,
+        Build,
+        Job,
+    )
+
+
+@migration(20230524181430)
+def add_build_id_as_job_primary_key(b: MigrationScriptBuilder):
+    # NOTE: We can't add a column to the primary key of an existing table
+    # so we have to drop the primary key constraint first,
+    # which is done differently in MySQL and PostgreSQL,
+    # and then add the new composite primary key.
+    table = '_tool_azuredevops_jobs'
+    b.execute(f'ALTER TABLE {table} MODIFY COLUMN build_id VARCHAR(255)', 
Dialect.MYSQL)
+    b.execute(f'ALTER TABLE {table} ALTER COLUMN build_id TYPE VARCHAR(255)', 
Dialect.POSTGRESQL)
+    b.execute(f'ALTER TABLE {table} DROP PRIMARY KEY', Dialect.MYSQL)
+    b.execute(f'ALTER TABLE {table} DROP CONSTRAINT {table}_pkey', 
Dialect.POSTGRESQL)
+    b.execute(f'ALTER TABLE {table} ADD PRIMARY KEY (id, build_id)')
+
+
+@migration(20230606165630)
+def rename_tx_rule_table_to_scope_config(b: MigrationScriptBuilder):
+    b.rename_table('_tool_azuredevops_azuredevopstransformationrules', 
'_tool_azuredevops_gitrepositoryconfigs')
+
+
+@migration(20230607165630, name="add entities column to gitrepositoryconfig 
table")
+def add_entities_column_to_scope_config(b: MigrationScriptBuilder):
+    b.add_column('_tool_azuredevops_gitrepositoryconfigs', 'entities', 'json')
+
+
+@migration(20230630000001, name="populated _raw_data_table column for 
azuredevops git repos")
+def add_raw_data_params_table_to_scope(b: MigrationScriptBuilder):
+    b.execute(f'''UPDATE _tool_azuredevops_gitrepositories SET _raw_data_table 
= '_raw_azuredevops_scopes' WHERE 1=1''')
diff --git a/backend/python/plugins/azuredevops/azuredevops/models.py 
b/backend/python/plugins/azuredevops/azuredevops/models.py
index f327334a4..437b36301 100644
--- a/backend/python/plugins/azuredevops/azuredevops/models.py
+++ b/backend/python/plugins/azuredevops/azuredevops/models.py
@@ -14,16 +14,17 @@
 # limitations under the License.
 
 import datetime
+import re
 from enum import Enum
 from typing import Optional
-import re
 
 from pydantic import SecretStr
-
-from pydevlake import Field, Connection, ScopeConfig
-from pydevlake.model import ToolModel, ToolScope
+from pydevlake import ScopeConfig, Field
+from pydevlake.model import ToolScope, ToolModel, Connection
 from pydevlake.pipeline_tasks import RefDiffOptions
-from pydevlake.migration import migration, MigrationScriptBuilder, Dialect
+
+# needed to be able to run migrations
+import azuredevops.migrations
 
 
 class AzureDevOpsConnection(Connection):
@@ -128,30 +129,3 @@ class Job(ToolModel, table=True):
     finish_time: Optional[datetime.datetime]
     state: JobState
     result: Optional[JobResult]
-
-
-@migration(20230524181430)
-def add_build_id_as_job_primary_key(b: MigrationScriptBuilder):
-    # NOTE: We can't add a column to the primary key of an existing table
-    # so we have to drop the primary key constraint first,
-    # which is done differently in MySQL and PostgreSQL,
-    # and then add the new composite primary key.
-    table = Job.__tablename__
-    b.execute(f'ALTER TABLE {table} DROP PRIMARY KEY', Dialect.MYSQL)
-    b.execute(f'ALTER TABLE {table} DROP CONSTRAINT {table}_pkey', 
Dialect.POSTGRESQL)
-    b.execute(f'ALTER TABLE {table} ADD PRIMARY KEY (id, build_id)')
-
-
-@migration(20230606165630)
-def rename_tx_rule_table_to_scope_config(b: MigrationScriptBuilder):
-    b.rename_table('_tool_azuredevops_azuredevopstransformationrules', 
GitRepositoryConfig.__tablename__)
-
-
-@migration(20230607165630, name="add entities column to gitrepositoryconfig 
table")
-def add_entities_column_to_scope_config(b: MigrationScriptBuilder):
-    b.add_column(GitRepositoryConfig.__tablename__, 'entities', 'json')
-
-
-@migration(20230630000001, name="populated _raw_data_table column for 
azuredevops git repos")
-def add_raw_data_params_table_to_scope(b: MigrationScriptBuilder):
-    b.execute(f'''UPDATE {GitRepository.__tablename__} SET _raw_data_table = 
'_raw_azuredevops_scopes' WHERE 1=1''')
diff --git a/backend/python/pydevlake/pydevlake/__init__.py 
b/backend/python/pydevlake/pydevlake/__init__.py
index 5adc1ba9e..c3e759e6f 100644
--- a/backend/python/pydevlake/pydevlake/__init__.py
+++ b/backend/python/pydevlake/pydevlake/__init__.py
@@ -13,7 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from typing import Any, Optional
+from typing import Optional
 
 import pytest
 pytest.register_assert_rewrite('pydevlake.testing')
diff --git a/backend/python/pydevlake/pydevlake/message.py 
b/backend/python/pydevlake/pydevlake/message.py
index bf5a2d0cb..431be5bb4 100644
--- a/backend/python/pydevlake/pydevlake/message.py
+++ b/backend/python/pydevlake/pydevlake/message.py
@@ -17,11 +17,11 @@
 from typing import Optional
 
 from pydantic import BaseModel, Field
-import jsonref
 
 from pydevlake.model import ToolScope
 from pydevlake.migration import MigrationScript
 from pydevlake.api import Response
+from pydevlake.model_info import DynamicModelInfo
 
 
 class Message(BaseModel):
@@ -39,27 +39,6 @@ class SubtaskMeta(BaseModel):
     arguments: list[str] = None
 
 
-class DynamicModelInfo(Message):
-    json_schema: dict
-    table_name: str
-
-    @staticmethod
-    def from_model(model_class):
-        schema = model_class.schema(by_alias=True)
-        if 'definitions' in schema:
-            # Replace $ref with actual schema
-            schema = jsonref.replace_refs(schema, proxies=False)
-            del schema['definitions']
-        # Pydantic forgets to put type in enums
-        for prop in schema['properties'].values():
-            if 'type' not in prop and 'enum' in prop:
-                prop['type'] = 'string'
-        return DynamicModelInfo(
-            json_schema=schema,
-            table_name=model_class.__tablename__
-        )
-
-
 class PluginInfo(Message):
     name: str
     description: str
@@ -87,8 +66,8 @@ class PipelineTask(Message):
 
 
 class DynamicDomainScope(Message):
-       type_name: str
-       data: bytes
+    type_name: str
+    data: bytes
 
 
 class PipelineData(Message):
diff --git a/backend/python/pydevlake/pydevlake/migration.py 
b/backend/python/pydevlake/pydevlake/migration.py
index 375fe91b5..f6244c4d3 100644
--- a/backend/python/pydevlake/pydevlake/migration.py
+++ b/backend/python/pydevlake/pydevlake/migration.py
@@ -20,9 +20,11 @@ from datetime import datetime
 
 from pydantic import BaseModel, Field
 
+from pydevlake.model_info import DynamicModelInfo
 
 MIGRATION_SCRIPTS = []
 
+
 class Dialect(Enum):
     MYSQL = "mysql"
     POSTGRESQL = "postgres"
@@ -65,8 +67,13 @@ class RenameTable(BaseModel):
     new_name: str
 
 
+class CreateTable(BaseModel):
+    type: Literal["create_table"] = "create_table"
+    model_info: DynamicModelInfo
+
+
 Operation = Annotated[
-    Union[Execute, AddColumn, DropColumn, RenameColumn, DropTable, 
RenameTable],
+    Union[Execute, AddColumn, DropColumn, RenameColumn, DropTable, 
RenameTable, CreateTable],
     Field(discriminator="type")
 ]
 
@@ -118,6 +125,13 @@ class MigrationScriptBuilder:
         """
         self.operations.append(RenameTable(old_name=old_name, 
new_name=new_name))
 
+    def create_tables(self, *model_classes):
+        """
+        Creates a table if it doesn't exist based on the object's fields.
+        """
+        for model_class in model_classes:
+            
self.operations.append(CreateTable(model_info=DynamicModelInfo.from_model(model_class)))
+
 
 def migration(version: int, name: Optional[str] = None):
     """
@@ -137,6 +151,7 @@ def migration(version: int, name: Optional[str] = None):
         script = MigrationScript(operations=builder.operations, 
version=version, name=name or fn.__name__)
         MIGRATION_SCRIPTS.append(script)
         return script
+
     return wrapper
 
 
@@ -148,4 +163,4 @@ def _validate_version(version: int):
     try:
         datetime.strptime(str_version, "%Y%m%d%H%M%S")
     except ValueError:
-        raise  err
+        raise err
diff --git a/backend/python/pydevlake/pydevlake/model.py 
b/backend/python/pydevlake/pydevlake/model.py
index 4b7f9bfcc..4f12ea794 100644
--- a/backend/python/pydevlake/pydevlake/model.py
+++ b/backend/python/pydevlake/pydevlake/model.py
@@ -26,7 +26,8 @@ from pydantic import AnyUrl, SecretStr, validator
 from sqlalchemy import Column, DateTime, Text
 from sqlalchemy.orm import declared_attr
 from sqlalchemy.inspection import inspect
-from sqlmodel import SQLModel, Field
+from sqlmodel import SQLModel
+from pydevlake import Field
 
 
 inflect_engine = inflect.engine()
@@ -86,6 +87,7 @@ class DomainType(Enum):
 class ScopeConfig(ToolTable, Model):
     name: str = Field(default="default")
     domain_types: list[DomainType] = Field(default=list(DomainType), 
alias="entities")
+    connection_id: Optional[int]
 
 
 class RawModel(SQLModel):
@@ -152,6 +154,7 @@ class DomainModel(NoPKModel):
 class ToolScope(ToolModel):
     id: str = Field(primary_key=True)
     name: str
+    scope_config_id: Optional[int]
 
 
 class DomainScope(DomainModel):
diff --git a/backend/python/pydevlake/pydevlake/model_info.py 
b/backend/python/pydevlake/pydevlake/model_info.py
new file mode 100644
index 000000000..dcab6c7d3
--- /dev/null
+++ b/backend/python/pydevlake/pydevlake/model_info.py
@@ -0,0 +1,41 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+
+#     http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import jsonref
+from pydantic import BaseModel
+
+
+class DynamicModelInfo(BaseModel):
+    json_schema: dict
+    table_name: str
+
+    class Config:
+        allow_population_by_field_name = True
+
+    @staticmethod
+    def from_model(model_class):
+        schema = model_class.schema(by_alias=True)
+        if 'definitions' in schema:
+            # Replace $ref with actual schema
+            schema = jsonref.replace_refs(schema, proxies=False)
+            del schema['definitions']
+        # Pydantic forgets to put type in enums
+        for prop in schema['properties'].values():
+            if 'type' not in prop and 'enum' in prop:
+                prop['type'] = 'string'
+        return DynamicModelInfo(
+            json_schema=schema,
+            table_name=model_class.__tablename__
+        )
diff --git a/backend/python/pydevlake/pydevlake/plugin.py 
b/backend/python/pydevlake/pydevlake/plugin.py
index c58331e8c..4d2f4993f 100644
--- a/backend/python/pydevlake/pydevlake/plugin.py
+++ b/backend/python/pydevlake/pydevlake/plugin.py
@@ -22,6 +22,7 @@ import sys
 import fire
 
 import pydevlake.message as msg
+import pydevlake.model_info
 from pydevlake.subtasks import Subtask
 from pydevlake.logger import logger
 from pydevlake.ipc import PluginCommands
@@ -233,10 +234,10 @@ class Plugin(ABC):
             description=self.description,
             plugin_path=self._plugin_path(),
             extension="datasource",
-            
connection_model_info=msg.DynamicModelInfo.from_model(self.connection_type),
-            
scope_model_info=msg.DynamicModelInfo.from_model(self.tool_scope_type),
-            
scope_config_model_info=msg.DynamicModelInfo.from_model(self.scope_config_type),
-            
tool_model_infos=[msg.DynamicModelInfo.from_model(stream.tool_model) for stream 
in self._streams.values()],
+            
connection_model_info=pydevlake.model_info.DynamicModelInfo.from_model(self.connection_type),
+            
scope_model_info=pydevlake.model_info.DynamicModelInfo.from_model(self.tool_scope_type),
+            
scope_config_model_info=pydevlake.model_info.DynamicModelInfo.from_model(self.scope_config_type),
+            
tool_model_infos=[pydevlake.model_info.DynamicModelInfo.from_model(stream.tool_model)
 for stream in self._streams.values()],
             subtask_metas=subtask_metas,
             migration_scripts=MIGRATION_SCRIPTS
         )
diff --git a/backend/python/test/fakeplugin/fakeplugin/main.py 
b/backend/python/test/fakeplugin/fakeplugin/main.py
index a3e1ab214..745c66ff2 100644
--- a/backend/python/test/fakeplugin/fakeplugin/main.py
+++ b/backend/python/test/fakeplugin/fakeplugin/main.py
@@ -13,34 +13,16 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from enum import Enum
-from datetime import datetime
-from typing import Optional
 import json
+from datetime import datetime
 
-from pydantic import SecretStr
-
-from pydevlake import Plugin, Connection, Stream, ToolModel, ToolScope, 
ScopeConfig, RemoteScopeGroup, DomainType, \
-    Field, TestConnectionResult
+from fakeplugin.models import FakeConnection, FakeProject, FakeScopeConfig, 
FakePipeline
+from pydevlake import Plugin, Stream, RemoteScopeGroup, DomainType, 
TestConnectionResult
 from pydevlake.domain_layer.devops import CicdScope, CICDPipeline, CICDStatus, 
CICDResult, CICDType
-from pydevlake.migration import migration, MigrationScriptBuilder, Dialect
 
 VALID_TOKEN = "this_is_a_valid_token"
 
 
-class FakePipeline(ToolModel, table=True):
-    class State(Enum):
-        PENDING = "pending"
-        RUNNING = "running"
-        FAILURE = "failure"
-        SUCCESS = "success"
-
-    id: str = Field(primary_key=True)
-    started_at: Optional[datetime]
-    finished_at: Optional[datetime]
-    state: State
-
-
 class FakePipelineStream(Stream):
     tool_model = FakePipeline
     domain_types = [DomainType.CICD]
@@ -94,18 +76,6 @@ class FakePipelineStream(Stream):
         return fake_pipelines
 
 
-class FakeConnection(Connection):
-    token: SecretStr
-
-
-class FakeProject(ToolScope, table=True):
-    url: str
-
-
-class FakeScopeConfig(ScopeConfig):
-    env: str
-
-
 class FakePlugin(Plugin):
     @property
     def connection_type(self):
@@ -167,12 +137,7 @@ class FakePlugin(Plugin):
         ]
 
 
-# test migration
-@migration(20230630000001, name="populated _raw_data_table column for 
fakeproject")
-def add_raw_data_params_table_to_scope(b: MigrationScriptBuilder):
-    b.execute(f'UPDATE {FakeProject.__tablename__} SET _raw_data_table = 
"_raw_fakeproject_scopes" WHERE 1=1', Dialect.MYSQL) #mysql only
-    b.execute(f'''UPDATE {FakeProject.__tablename__} SET _raw_data_table = 
'_raw_fakeproject_scopes' WHERE 1=1''', Dialect.POSTGRESQL) #mysql and postgres
-
+import fakeplugin.migrations  # NEEDED
 
 if __name__ == '__main__':
     FakePlugin.start()
diff --git a/backend/python/test/fakeplugin/fakeplugin/migrations.py 
b/backend/python/test/fakeplugin/fakeplugin/migrations.py
new file mode 100644
index 000000000..9e9439787
--- /dev/null
+++ b/backend/python/test/fakeplugin/fakeplugin/migrations.py
@@ -0,0 +1,29 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+
+#     http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from fakeplugin.models import FakeConnection, FakePipeline, FakeProject, 
FakeScopeConfig
+from pydevlake.migration import migration, MigrationScriptBuilder, Dialect
+
+
+@migration(20230530000001, name="init schemas")
+def init_schemas(b: MigrationScriptBuilder):
+    b.create_tables(FakeConnection, FakePipeline, FakeProject, FakeScopeConfig)
+
+
+# test migration
+@migration(20230630000001, name="populated _raw_data_table column for 
fakeproject")
+def add_raw_data_params_table_to_scope(b: MigrationScriptBuilder):
+    b.execute(f'UPDATE {FakeProject.__tablename__} SET _raw_data_table = 
"_raw_fakeproject_scopes" WHERE 1=1', Dialect.MYSQL) #mysql only
+    b.execute(f'''UPDATE {FakeProject.__tablename__} SET _raw_data_table = 
'_raw_fakeproject_scopes' WHERE 1=1''', Dialect.POSTGRESQL) #mysql and postgres
\ No newline at end of file
diff --git a/backend/python/test/fakeplugin/fakeplugin/models.py 
b/backend/python/test/fakeplugin/fakeplugin/models.py
new file mode 100644
index 000000000..16f543dd8
--- /dev/null
+++ b/backend/python/test/fakeplugin/fakeplugin/models.py
@@ -0,0 +1,47 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+
+#     http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from datetime import datetime
+from enum import Enum
+from typing import Optional
+
+from pydantic import SecretStr
+
+from pydevlake import ScopeConfig, ToolScope, Connection, ToolModel, Field
+
+
+class FakeConnection(Connection):
+    token: SecretStr
+
+
+class FakeProject(ToolScope, table=True):
+    url: str
+
+
+class FakeScopeConfig(ScopeConfig):
+    env: str
+
+
+class FakePipeline(ToolModel, table=True):
+    class State(Enum):
+        PENDING = "pending"
+        RUNNING = "running"
+        FAILURE = "failure"
+        SUCCESS = "success"
+
+    id: str = Field(primary_key=True)
+    started_at: Optional[datetime]
+    finished_at: Optional[datetime]
+    state: State
diff --git a/backend/server/services/remote/init.go 
b/backend/server/services/remote/init.go
index 4c6dd8ff1..5005b3cbc 100644
--- a/backend/server/services/remote/init.go
+++ b/backend/server/services/remote/init.go
@@ -43,10 +43,6 @@ func NewRemotePlugin(info *models.PluginInfo) 
(models.RemotePlugin, errors.Error
        if err != nil {
                return nil, err
        }
-       err = plugin.RunAutoMigrations()
-       if err != nil {
-               return nil, err
-       }
        err = pluginCore.RegisterPlugin(info.Name, plugin)
        if err != nil {
                return nil, err
diff --git a/backend/server/services/remote/models/conversion.go 
b/backend/server/services/remote/models/conversion.go
index 51c2e55e2..f5ec727ba 100644
--- a/backend/server/services/remote/models/conversion.go
+++ b/backend/server/services/remote/models/conversion.go
@@ -33,7 +33,11 @@ import (
 )
 
 func LoadTableModel(tableName string, schema utils.JsonObject, parentModel 
any) (models.DynamicTabler, errors.Error) {
-       structType, err := GenerateStructType(schema, 
reflect.TypeOf(parentModel))
+       var baseType reflect.Type = nil
+       if parentModel != nil {
+               baseType = reflect.TypeOf(parentModel)
+       }
+       structType, err := GenerateStructType(schema, baseType)
        if err != nil {
                return nil, err
        }
@@ -60,7 +64,7 @@ func GenerateStructType(schema utils.JsonObject, baseType 
reflect.Type) (reflect
                structFields = append(structFields, anonymousField)
        }
        for k, v := range props {
-               if isBaseTypeField(k, baseType) {
+               if baseType != nil && isBaseTypeField(k, baseType) {
                        continue
                }
                spec := v.(utils.JsonObject)
@@ -136,6 +140,7 @@ var (
        stringType  = reflect.TypeOf("")
        timeType    = reflect.TypeOf(time.Time{})
        jsonMapType = reflect.TypeOf(datatypes.JSONMap{})
+       jsonType    = reflect.TypeOf(datatypes.JSON{})
 )
 
 func generateStructField(name string, schema utils.JsonObject, required bool) 
(*reflect.StructField, errors.Error) {
@@ -156,11 +161,11 @@ func generateStructField(name string, schema 
utils.JsonObject, required bool) (*
 }
 
 func getGoType(schema utils.JsonObject, required bool) (reflect.Type, 
errors.Error) {
-       jsonType, ok := schema["type"].(string)
+       rawType, ok := schema["type"].(string)
        if !ok {
                return nil, errors.BadInput.New("\"type\" property must be a 
string")
        }
-       switch jsonType {
+       switch rawType {
        //TODO: support more types
        case "integer":
                return int64Type, nil
@@ -179,10 +184,12 @@ func getGoType(schema utils.JsonObject, required bool) 
(reflect.Type, errors.Err
                } else {
                        return stringType, nil
                }
+       case "array":
+               return jsonType, nil
        case "object":
                return jsonMapType, nil
        default:
-               return nil, errors.BadInput.New(fmt.Sprintf("Unsupported type 
%s", jsonType))
+               return nil, errors.BadInput.New(fmt.Sprintf("Unsupported type 
%s", rawType))
        }
 }
 
diff --git a/backend/server/services/remote/models/migration.go 
b/backend/server/services/remote/models/migration.go
index 1862bf5da..c06b1d22f 100644
--- a/backend/server/services/remote/models/migration.go
+++ b/backend/server/services/remote/models/migration.go
@@ -19,15 +19,16 @@ package models
 
 import (
        "encoding/json"
-
        "github.com/apache/incubator-devlake/core/context"
        "github.com/apache/incubator-devlake/core/dal"
        "github.com/apache/incubator-devlake/core/errors"
+       "github.com/apache/incubator-devlake/core/models/common"
        "github.com/apache/incubator-devlake/core/plugin"
+       "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
 )
 
 type Operation interface {
-       Execute(dal.Dal) errors.Error
+       Execute(basicRes context.BasicRes) errors.Error
 }
 
 type ExecuteOperation struct {
@@ -35,14 +36,15 @@ type ExecuteOperation struct {
        Dialect *string `json:"dialect"`
 }
 
-func (o ExecuteOperation) Execute(dal dal.Dal) errors.Error {
+func (o ExecuteOperation) Execute(basicRes context.BasicRes) errors.Error {
+       db := basicRes.GetDal()
        if o.Dialect != nil {
-               if dal.Dialect() == *o.Dialect {
-                       return dal.Exec(o.Sql)
+               if db.Dialect() == *o.Dialect {
+                       return db.Exec(o.Sql)
                }
                return nil
        } else {
-               return dal.Exec(o.Sql)
+               return db.Exec(o.Sql)
        }
 }
 
@@ -54,14 +56,12 @@ type AddColumnOperation struct {
        ColumnType dal.ColumnType `json:"column_type"`
 }
 
-func (o AddColumnOperation) Execute(dal dal.Dal) errors.Error {
-       if dal.HasColumn(o.Table, o.Column) {
-               err := dal.DropColumns(o.Table, o.Column)
-               if err != nil {
-                       return err
-               }
+func (o AddColumnOperation) Execute(basicRes context.BasicRes) errors.Error {
+       db := basicRes.GetDal()
+       if db.HasColumn(o.Table, o.Column) {
+               return nil
        }
-       return dal.AddColumn(o.Table, o.Column, o.ColumnType)
+       return db.AddColumn(o.Table, o.Column, o.ColumnType)
 }
 
 type DropColumnOperation struct {
@@ -69,9 +69,10 @@ type DropColumnOperation struct {
        Column string `json:"column"`
 }
 
-func (o DropColumnOperation) Execute(dal dal.Dal) errors.Error {
-       if dal.HasColumn(o.Table, o.Column) {
-               return dal.DropColumns(o.Table, o.Column)
+func (o DropColumnOperation) Execute(basicRes context.BasicRes) errors.Error {
+       db := basicRes.GetDal()
+       if db.HasColumn(o.Table, o.Column) {
+               return db.DropColumns(o.Table, o.Column)
        }
        return nil
 }
@@ -83,9 +84,10 @@ type DropTableOperation struct {
        Column string `json:"column"`
 }
 
-func (o DropTableOperation) Execute(dal dal.Dal) errors.Error {
-       if dal.HasTable(o.Table) {
-               return dal.DropTables(o.Table)
+func (o DropTableOperation) Execute(basicRes context.BasicRes) errors.Error {
+       db := basicRes.GetDal()
+       if db.HasTable(o.Table) {
+               return db.DropTables(o.Table)
        }
        return nil
 }
@@ -97,17 +99,33 @@ type RenameTableOperation struct {
        NewName string `json:"new_name"`
 }
 
-func (o RenameTableOperation) Execute(dal dal.Dal) errors.Error {
-       if !dal.HasTable(o.OldName) {
+func (o RenameTableOperation) Execute(basicRes context.BasicRes) errors.Error {
+       db := basicRes.GetDal()
+       if !db.HasTable(o.OldName) {
                return nil
        }
-       if dal.HasTable(o.NewName) {
-               err := dal.DropTables(o.NewName)
-               if err != nil {
-                       return err
-               }
+       return db.RenameTable(o.OldName, o.NewName)
+}
+
+type CreateTableOperation struct {
+       ModelInfo *DynamicModelInfo `json:"model_info"`
+}
+
+func (o CreateTableOperation) Execute(basicRes context.BasicRes) errors.Error {
+       db := basicRes.GetDal()
+       if db.HasTable(o.ModelInfo.TableName) {
+               basicRes.GetLogger().Warn(nil, "table %s already exists. It 
won't be created.", o.ModelInfo.TableName)
+               return nil
        }
-       return dal.RenameTable(o.OldName, o.NewName)
+       model, err := o.ModelInfo.LoadDynamicTabler(common.NoPKModel{})
+       if err != nil {
+               return err
+       }
+       err = api.CallDB(db.AutoMigrate, model.New())
+       if err != nil {
+               return err
+       }
+       return nil
 }
 
 var _ Operation = (*RenameTableOperation)(nil)
@@ -135,7 +153,7 @@ func (s *RemoteMigrationScript) UnmarshalJSON(data []byte) 
error {
        s.operations = make([]Operation, len(rawScript.Operations))
        for i, operationRaw := range rawScript.Operations {
                operationMap := make(map[string]interface{})
-               err := json.Unmarshal(operationRaw, &operationMap)
+               err = json.Unmarshal(operationRaw, &operationMap)
                if err != nil {
                        return err
                }
@@ -152,6 +170,8 @@ func (s *RemoteMigrationScript) UnmarshalJSON(data []byte) 
error {
                        operation = &DropTableOperation{}
                case "rename_table":
                        operation = &RenameTableOperation{}
+               case "create_table":
+                       operation = &CreateTableOperation{}
                default:
                        return errors.BadInput.New("unsupported operation type")
                }
@@ -165,9 +185,8 @@ func (s *RemoteMigrationScript) UnmarshalJSON(data []byte) 
error {
 }
 
 func (s *RemoteMigrationScript) Up(basicRes context.BasicRes) errors.Error {
-       db := basicRes.GetDal()
        for _, operation := range s.operations {
-               err := operation.Execute(db)
+               err := operation.Execute(basicRes)
                if err != nil {
                        return err
                }
diff --git a/backend/server/services/remote/models/plugin_remote.go 
b/backend/server/services/remote/models/plugin_remote.go
index 358d89a96..988283630 100644
--- a/backend/server/services/remote/models/plugin_remote.go
+++ b/backend/server/services/remote/models/plugin_remote.go
@@ -18,7 +18,6 @@ limitations under the License.
 package models
 
 import (
-       "github.com/apache/incubator-devlake/core/errors"
        "github.com/apache/incubator-devlake/core/plugin"
 )
 
@@ -31,5 +30,4 @@ type RemotePlugin interface {
        plugin.PluginModel
        plugin.PluginMigration
        plugin.PluginSource
-       RunAutoMigrations() errors.Error
 }
diff --git a/backend/server/services/remote/plugin/plugin_impl.go 
b/backend/server/services/remote/plugin/plugin_impl.go
index 1e6c66d1c..7ced36f9d 100644
--- a/backend/server/services/remote/plugin/plugin_impl.go
+++ b/backend/server/services/remote/plugin/plugin_impl.go
@@ -230,36 +230,6 @@ func (p *remotePluginImpl) ApiResources() 
map[string]map[string]plugin.ApiResour
        return p.resources
 }
 
-func (p *remotePluginImpl) RunAutoMigrations() errors.Error {
-       err := p.createTable(p.connectionTabler.New())
-       if err != nil {
-               return err
-       }
-       err = p.createTable(p.scopeTabler.New())
-       if err != nil {
-               return err
-       }
-       err = p.createTable(p.scopeConfigTabler.New())
-       if err != nil {
-               return err
-       }
-       for _, toolModelTabler := range p.toolModelTablers {
-               err = p.createTable(toolModelTabler.New())
-               if err != nil {
-                       return err
-               }
-       }
-       return nil
-}
-
-func (p *remotePluginImpl) createTable(tbl coreModels.DynamicTabler) 
errors.Error {
-       db := basicRes.GetDal()
-       if db.HasTable(tbl.TableName()) {
-               return nil
-       }
-       return api.CallDB(db.AutoMigrate, tbl)
-}
-
 func (p *remotePluginImpl) OpenApiSpec() string {
        return p.openApiSpec
 }


Reply via email to