This is an automated email from the ASF dual-hosted git repository.
klesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/main by this push:
new 685d0cba5 [fix-5689]: Python schema migrations must be fully manual
(#5746)
685d0cba5 is described below
commit 685d0cba5b9dfcb29b9d4ce3a606ee6024b8ae0e
Author: Keon Amini <[email protected]>
AuthorDate: Tue Aug 1 13:17:38 2023 +0330
[fix-5689]: Python schema migrations must be fully manual (#5746)
* fix: Python schema migrations now all fully manual
* feat: support ignoring migrations with old versions
* refactor: move DynamicModelInfo into new file to eliminate duplication
and circular imports
* fix: fix python models
* fix: CreateTable Op now just checks if the table exists
* docs: Updated python readme
* fix: fix bad postgres syntax in python
---
backend/python/README.md | 27 +++-
.../plugins/azuredevops/azuredevops/migrations.py | 165 +++++++++++++++++++++
.../plugins/azuredevops/azuredevops/models.py | 38 +----
backend/python/pydevlake/pydevlake/__init__.py | 2 +-
backend/python/pydevlake/pydevlake/message.py | 27 +---
backend/python/pydevlake/pydevlake/migration.py | 19 ++-
backend/python/pydevlake/pydevlake/model.py | 5 +-
backend/python/pydevlake/pydevlake/model_info.py | 41 +++++
backend/python/pydevlake/pydevlake/plugin.py | 9 +-
backend/python/test/fakeplugin/fakeplugin/main.py | 43 +-----
.../test/fakeplugin/fakeplugin/migrations.py | 29 ++++
.../python/test/fakeplugin/fakeplugin/models.py | 47 ++++++
backend/server/services/remote/init.go | 4 -
.../server/services/remote/models/conversion.go | 17 ++-
backend/server/services/remote/models/migration.go | 79 ++++++----
.../server/services/remote/models/plugin_remote.go | 2 -
.../server/services/remote/plugin/plugin_impl.go | 30 ----
17 files changed, 405 insertions(+), 179 deletions(-)
diff --git a/backend/python/README.md b/backend/python/README.md
index 912f84d76..f909b7916 100644
--- a/backend/python/README.md
+++ b/backend/python/README.md
@@ -237,23 +237,38 @@ To facilitate or even eliminate extraction, your tool
models should be close to
#### Migration of tool models
Tool models, connection, scope and scope config types are stored in the
DevLake database.
-When you change the definition of one of those types, the database needs to be
migrated.
-Automatic migration takes care of most modifications, but some changes require
manual migration. For example, automatic migration never drops columns. Another
example is adding a column to the primary key of a table, you need to write a
script that remove the primary key constraint and add a new compound primary
key.
+When you create or change the definition of one of those types, the database
needs to be migrated. This requires that
+you write migration code for them. Important: When you write a migration for a
creation operation, the model must be a SNAPSHOT of the models you're intending
+to migrate; don't directly use the actual model because that may change over
time. Instead, define a model that is a copy of the main one, and use that in
the
+migration - this model's code will never change (Hence, it's a snapshot).
+Also, keep in mind, that Python only supports writing schema migrations. If
your flow requires data migrations as well, at this time, the code needs to
+be written in Go. See [this Go
package](https://github.com/apache/incubator-devlake/tree/main/backend/server/services/remote/models/migrationscripts)
for example.
To declare a new migration script, you decorate a function with the
`migration` decorator. The function name should describe what the script does.
The `migration` decorator takes a version number that should be a 14 digits
timestamp in the format `YYYYMMDDhhmmss`. The function takes a
`MigrationScriptBuilder` as a parameter. This builder exposes methods to
execute migration operations.
##### Migration operations
-The `MigrationScriptBuilder` exposes the following methods:
+The `MigrationScriptBuilder` exposes several methods. Here we list a few:
- `execute(sql: str, dialect: Optional[Dialect])`: execute a raw SQL
statement. The `dialect` parameter is used to execute the SQL statement only if
the database is of the given dialect. If `dialect` is `None`, the statement is
executed unconditionally.
- `drop_column(table: str, column: str)`: drop a column from a table
- `drop_table(table: str)`: drop a table
+Example of creating tables via migrations:
+```python
+
+@migration(20230501000001, name="initialize schemas for Plugin")
+def init_schemas(b: MigrationScriptBuilder):
+ class PluginConnection(Connection):
+ token: SecretStr
+ organization: Optional[str]
+ b.create_tables(PluginConnection)
+```
+
```python
from pydevlake.migration import MigrationScriptBuilder, migration, Dialect
-@migration(20230524181430)
+@migration(20230524181430, name="add pk to Job table")
def add_build_id_as_job_primary_key(b: MigrationScriptBuilder):
table = Job.__tablename__
b.execute(f'ALTER TABLE {table} DROP PRIMARY KEY', Dialect.MYSQL)
@@ -296,7 +311,9 @@ Most of the time, you will convert a tool model into a
single domain model, but
The `collect` method takes a `state` dictionary and a context object and
yields tuples of raw data and new state.
The last state that the plugin yielded for a given connection will be reused
during the next collection.
-The plugin can use this `state` to store information necessary to perform
incremental collection of data.
+The plugin can use this `state` to store information necessary to perform
incremental collection of data. This operates
+independently of the way Go manages state, and is tracked by the table
`_pydevlake_subtask_runs`. See [this
issue](https://github.com/apache/incubator-devlake/issues/4880)
+for a proposed improvement to this feature.
The `extract` method takes a raw data object and returns a tool model.
This method has a default implementation that populates an instance of the
`tool_model` class with the raw data.
diff --git a/backend/python/plugins/azuredevops/azuredevops/migrations.py
b/backend/python/plugins/azuredevops/azuredevops/migrations.py
new file mode 100644
index 000000000..615bb2049
--- /dev/null
+++ b/backend/python/plugins/azuredevops/azuredevops/migrations.py
@@ -0,0 +1,165 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import datetime
+import re
+from enum import Enum
+from typing import Optional
+
+from pydantic import SecretStr
+
+from pydevlake import ToolModel, Connection, Field
+from pydevlake.migration import migration, Dialect, MigrationScriptBuilder
+from pydevlake.model import Model, ToolTable
+from pydevlake.pipeline_tasks import RefDiffOptions
+
+
+@migration(20230501000001, name="initialize schemas for Azure Devops")
+def init_schemas(b: MigrationScriptBuilder):
+ class AzureDevOpsConnection(Connection):
+ token: SecretStr
+ organization: Optional[str]
+
+ class AzureDevopsTransformationRule(ToolTable, Model):
+ name: str = Field(default="default")
+ refdiff: Optional[RefDiffOptions]
+ deployment_pattern: Optional[re.Pattern]
+ production_pattern: Optional[re.Pattern]
+
+ class GitRepository(ToolModel):
+ id: str = Field(primary_key=True)
+ name: str
+ url: str
+ remote_url: Optional[str]
+ default_branch: Optional[str]
+ project_id: str
+ org_id: str
+ parent_repository_url: Optional[str]
+ provider: Optional[str]
+
+ class GitPullRequest(ToolModel):
+ class PRStatus(Enum):
+ Abandoned = "abandoned"
+ Active = "active"
+ Completed = "completed"
+
+ pull_request_id: int = Field(primary_key=True)
+ description: Optional[str]
+ status: PRStatus
+ created_by_id: str
+ created_by_name: str
+ creation_date: datetime.datetime
+ closed_date: Optional[datetime.datetime]
+ source_commit_sha: str
+ target_commit_sha: str
+ merge_commit_sha: Optional[str]
+ url: Optional[str]
+ type: Optional[str]
+ title: Optional[str]
+ target_ref_name: Optional[str]
+ source_ref_name: Optional[str]
+ fork_repo_id: Optional[str]
+
+ class GitPullRequestCommit(ToolModel):
+ commit_id: str = Field(primary_key=True)
+ pull_request_id: str
+ author_name: str
+ author_email: str
+ author_date: datetime.datetime
+
+ class Build(ToolModel):
+ class BuildStatus(Enum):
+ Cancelling = "cancelling"
+ Completed = "completed"
+ InProgress = "inProgress"
+ NotStarted = "notStarted"
+ Postponed = "postponed"
+
+ class BuildResult(Enum):
+ Canceled = "canceled"
+ Failed = "failed"
+ Non = "none"
+ PartiallySucceeded = "partiallySucceeded"
+ Succeeded = "succeeded"
+
+ id: int = Field(primary_key=True)
+ name: str
+ start_time: Optional[datetime.datetime]
+ finish_time: Optional[datetime.datetime]
+ status: BuildStatus
+ result: Optional[BuildResult]
+ source_branch: str
+ source_version: str
+
+ class Job(ToolModel):
+ class JobState(Enum):
+ Completed = "completed"
+ InProgress = "inProgress"
+ Pending = "pending"
+
+ class JobResult(Enum):
+ Abandoned = "abandoned"
+ Canceled = "canceled"
+ Failed = "failed"
+ Skipped = "skipped"
+ Succeeded = "succeeded"
+ SucceededWithIssues = "succeededWithIssues"
+
+ id: str = Field(primary_key=True)
+ build_id: str
+ name: str
+ start_time: Optional[datetime.datetime]
+ finish_time: Optional[datetime.datetime]
+ state: JobState
+ result: Optional[JobResult]
+
+ b.create_tables(
+ AzureDevOpsConnection,
+ AzureDevopsTransformationRule,
+ GitRepository,
+ GitPullRequestCommit,
+ GitPullRequest,
+ Build,
+ Job,
+ )
+
+
+@migration(20230524181430)
+def add_build_id_as_job_primary_key(b: MigrationScriptBuilder):
+ # NOTE: We can't add a column to the primary key of an existing table
+ # so we have to drop the primary key constraint first,
+ # which is done differently in MySQL and PostgreSQL,
+ # and then add the new composite primary key.
+ table = '_tool_azuredevops_jobs'
+ b.execute(f'ALTER TABLE {table} MODIFY COLUMN build_id VARCHAR(255)',
Dialect.MYSQL)
+ b.execute(f'ALTER TABLE {table} ALTER COLUMN build_id TYPE VARCHAR(255)',
Dialect.POSTGRESQL)
+ b.execute(f'ALTER TABLE {table} DROP PRIMARY KEY', Dialect.MYSQL)
+ b.execute(f'ALTER TABLE {table} DROP CONSTRAINT {table}_pkey',
Dialect.POSTGRESQL)
+ b.execute(f'ALTER TABLE {table} ADD PRIMARY KEY (id, build_id)')
+
+
+@migration(20230606165630)
+def rename_tx_rule_table_to_scope_config(b: MigrationScriptBuilder):
+ b.rename_table('_tool_azuredevops_azuredevopstransformationrules',
'_tool_azuredevops_gitrepositoryconfigs')
+
+
+@migration(20230607165630, name="add entities column to gitrepositoryconfig
table")
+def add_entities_column_to_scope_config(b: MigrationScriptBuilder):
+ b.add_column('_tool_azuredevops_gitrepositoryconfigs', 'entities', 'json')
+
+
+@migration(20230630000001, name="populated _raw_data_table column for
azuredevops git repos")
+def add_raw_data_params_table_to_scope(b: MigrationScriptBuilder):
+ b.execute(f'''UPDATE _tool_azuredevops_gitrepositories SET _raw_data_table
= '_raw_azuredevops_scopes' WHERE 1=1''')
diff --git a/backend/python/plugins/azuredevops/azuredevops/models.py
b/backend/python/plugins/azuredevops/azuredevops/models.py
index f327334a4..437b36301 100644
--- a/backend/python/plugins/azuredevops/azuredevops/models.py
+++ b/backend/python/plugins/azuredevops/azuredevops/models.py
@@ -14,16 +14,17 @@
# limitations under the License.
import datetime
+import re
from enum import Enum
from typing import Optional
-import re
from pydantic import SecretStr
-
-from pydevlake import Field, Connection, ScopeConfig
-from pydevlake.model import ToolModel, ToolScope
+from pydevlake import ScopeConfig, Field
+from pydevlake.model import ToolScope, ToolModel, Connection
from pydevlake.pipeline_tasks import RefDiffOptions
-from pydevlake.migration import migration, MigrationScriptBuilder, Dialect
+
+# needed to be able to run migrations
+import azuredevops.migrations
class AzureDevOpsConnection(Connection):
@@ -128,30 +129,3 @@ class Job(ToolModel, table=True):
finish_time: Optional[datetime.datetime]
state: JobState
result: Optional[JobResult]
-
-
-@migration(20230524181430)
-def add_build_id_as_job_primary_key(b: MigrationScriptBuilder):
- # NOTE: We can't add a column to the primary key of an existing table
- # so we have to drop the primary key constraint first,
- # which is done differently in MySQL and PostgreSQL,
- # and then add the new composite primary key.
- table = Job.__tablename__
- b.execute(f'ALTER TABLE {table} DROP PRIMARY KEY', Dialect.MYSQL)
- b.execute(f'ALTER TABLE {table} DROP CONSTRAINT {table}_pkey',
Dialect.POSTGRESQL)
- b.execute(f'ALTER TABLE {table} ADD PRIMARY KEY (id, build_id)')
-
-
-@migration(20230606165630)
-def rename_tx_rule_table_to_scope_config(b: MigrationScriptBuilder):
- b.rename_table('_tool_azuredevops_azuredevopstransformationrules',
GitRepositoryConfig.__tablename__)
-
-
-@migration(20230607165630, name="add entities column to gitrepositoryconfig
table")
-def add_entities_column_to_scope_config(b: MigrationScriptBuilder):
- b.add_column(GitRepositoryConfig.__tablename__, 'entities', 'json')
-
-
-@migration(20230630000001, name="populated _raw_data_table column for
azuredevops git repos")
-def add_raw_data_params_table_to_scope(b: MigrationScriptBuilder):
- b.execute(f'''UPDATE {GitRepository.__tablename__} SET _raw_data_table =
'_raw_azuredevops_scopes' WHERE 1=1''')
diff --git a/backend/python/pydevlake/pydevlake/__init__.py
b/backend/python/pydevlake/pydevlake/__init__.py
index 5adc1ba9e..c3e759e6f 100644
--- a/backend/python/pydevlake/pydevlake/__init__.py
+++ b/backend/python/pydevlake/pydevlake/__init__.py
@@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from typing import Any, Optional
+from typing import Optional
import pytest
pytest.register_assert_rewrite('pydevlake.testing')
diff --git a/backend/python/pydevlake/pydevlake/message.py
b/backend/python/pydevlake/pydevlake/message.py
index bf5a2d0cb..431be5bb4 100644
--- a/backend/python/pydevlake/pydevlake/message.py
+++ b/backend/python/pydevlake/pydevlake/message.py
@@ -17,11 +17,11 @@
from typing import Optional
from pydantic import BaseModel, Field
-import jsonref
from pydevlake.model import ToolScope
from pydevlake.migration import MigrationScript
from pydevlake.api import Response
+from pydevlake.model_info import DynamicModelInfo
class Message(BaseModel):
@@ -39,27 +39,6 @@ class SubtaskMeta(BaseModel):
arguments: list[str] = None
-class DynamicModelInfo(Message):
- json_schema: dict
- table_name: str
-
- @staticmethod
- def from_model(model_class):
- schema = model_class.schema(by_alias=True)
- if 'definitions' in schema:
- # Replace $ref with actual schema
- schema = jsonref.replace_refs(schema, proxies=False)
- del schema['definitions']
- # Pydantic forgets to put type in enums
- for prop in schema['properties'].values():
- if 'type' not in prop and 'enum' in prop:
- prop['type'] = 'string'
- return DynamicModelInfo(
- json_schema=schema,
- table_name=model_class.__tablename__
- )
-
-
class PluginInfo(Message):
name: str
description: str
@@ -87,8 +66,8 @@ class PipelineTask(Message):
class DynamicDomainScope(Message):
- type_name: str
- data: bytes
+ type_name: str
+ data: bytes
class PipelineData(Message):
diff --git a/backend/python/pydevlake/pydevlake/migration.py
b/backend/python/pydevlake/pydevlake/migration.py
index 375fe91b5..f6244c4d3 100644
--- a/backend/python/pydevlake/pydevlake/migration.py
+++ b/backend/python/pydevlake/pydevlake/migration.py
@@ -20,9 +20,11 @@ from datetime import datetime
from pydantic import BaseModel, Field
+from pydevlake.model_info import DynamicModelInfo
MIGRATION_SCRIPTS = []
+
class Dialect(Enum):
MYSQL = "mysql"
POSTGRESQL = "postgres"
@@ -65,8 +67,13 @@ class RenameTable(BaseModel):
new_name: str
+class CreateTable(BaseModel):
+ type: Literal["create_table"] = "create_table"
+ model_info: DynamicModelInfo
+
+
Operation = Annotated[
- Union[Execute, AddColumn, DropColumn, RenameColumn, DropTable,
RenameTable],
+ Union[Execute, AddColumn, DropColumn, RenameColumn, DropTable,
RenameTable, CreateTable],
Field(discriminator="type")
]
@@ -118,6 +125,13 @@ class MigrationScriptBuilder:
"""
self.operations.append(RenameTable(old_name=old_name,
new_name=new_name))
+ def create_tables(self, *model_classes):
+ """
+ Creates a table if it doesn't exist based on the object's fields.
+ """
+ for model_class in model_classes:
+
self.operations.append(CreateTable(model_info=DynamicModelInfo.from_model(model_class)))
+
def migration(version: int, name: Optional[str] = None):
"""
@@ -137,6 +151,7 @@ def migration(version: int, name: Optional[str] = None):
script = MigrationScript(operations=builder.operations,
version=version, name=name or fn.__name__)
MIGRATION_SCRIPTS.append(script)
return script
+
return wrapper
@@ -148,4 +163,4 @@ def _validate_version(version: int):
try:
datetime.strptime(str_version, "%Y%m%d%H%M%S")
except ValueError:
- raise err
+ raise err
diff --git a/backend/python/pydevlake/pydevlake/model.py
b/backend/python/pydevlake/pydevlake/model.py
index 4b7f9bfcc..4f12ea794 100644
--- a/backend/python/pydevlake/pydevlake/model.py
+++ b/backend/python/pydevlake/pydevlake/model.py
@@ -26,7 +26,8 @@ from pydantic import AnyUrl, SecretStr, validator
from sqlalchemy import Column, DateTime, Text
from sqlalchemy.orm import declared_attr
from sqlalchemy.inspection import inspect
-from sqlmodel import SQLModel, Field
+from sqlmodel import SQLModel
+from pydevlake import Field
inflect_engine = inflect.engine()
@@ -86,6 +87,7 @@ class DomainType(Enum):
class ScopeConfig(ToolTable, Model):
name: str = Field(default="default")
domain_types: list[DomainType] = Field(default=list(DomainType),
alias="entities")
+ connection_id: Optional[int]
class RawModel(SQLModel):
@@ -152,6 +154,7 @@ class DomainModel(NoPKModel):
class ToolScope(ToolModel):
id: str = Field(primary_key=True)
name: str
+ scope_config_id: Optional[int]
class DomainScope(DomainModel):
diff --git a/backend/python/pydevlake/pydevlake/model_info.py
b/backend/python/pydevlake/pydevlake/model_info.py
new file mode 100644
index 000000000..dcab6c7d3
--- /dev/null
+++ b/backend/python/pydevlake/pydevlake/model_info.py
@@ -0,0 +1,41 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import jsonref
+from pydantic import BaseModel
+
+
+class DynamicModelInfo(BaseModel):
+ json_schema: dict
+ table_name: str
+
+ class Config:
+ allow_population_by_field_name = True
+
+ @staticmethod
+ def from_model(model_class):
+ schema = model_class.schema(by_alias=True)
+ if 'definitions' in schema:
+ # Replace $ref with actual schema
+ schema = jsonref.replace_refs(schema, proxies=False)
+ del schema['definitions']
+ # Pydantic forgets to put type in enums
+ for prop in schema['properties'].values():
+ if 'type' not in prop and 'enum' in prop:
+ prop['type'] = 'string'
+ return DynamicModelInfo(
+ json_schema=schema,
+ table_name=model_class.__tablename__
+ )
diff --git a/backend/python/pydevlake/pydevlake/plugin.py
b/backend/python/pydevlake/pydevlake/plugin.py
index c58331e8c..4d2f4993f 100644
--- a/backend/python/pydevlake/pydevlake/plugin.py
+++ b/backend/python/pydevlake/pydevlake/plugin.py
@@ -22,6 +22,7 @@ import sys
import fire
import pydevlake.message as msg
+import pydevlake.model_info
from pydevlake.subtasks import Subtask
from pydevlake.logger import logger
from pydevlake.ipc import PluginCommands
@@ -233,10 +234,10 @@ class Plugin(ABC):
description=self.description,
plugin_path=self._plugin_path(),
extension="datasource",
-
connection_model_info=msg.DynamicModelInfo.from_model(self.connection_type),
-
scope_model_info=msg.DynamicModelInfo.from_model(self.tool_scope_type),
-
scope_config_model_info=msg.DynamicModelInfo.from_model(self.scope_config_type),
-
tool_model_infos=[msg.DynamicModelInfo.from_model(stream.tool_model) for stream
in self._streams.values()],
+
connection_model_info=pydevlake.model_info.DynamicModelInfo.from_model(self.connection_type),
+
scope_model_info=pydevlake.model_info.DynamicModelInfo.from_model(self.tool_scope_type),
+
scope_config_model_info=pydevlake.model_info.DynamicModelInfo.from_model(self.scope_config_type),
+
tool_model_infos=[pydevlake.model_info.DynamicModelInfo.from_model(stream.tool_model)
for stream in self._streams.values()],
subtask_metas=subtask_metas,
migration_scripts=MIGRATION_SCRIPTS
)
diff --git a/backend/python/test/fakeplugin/fakeplugin/main.py
b/backend/python/test/fakeplugin/fakeplugin/main.py
index a3e1ab214..745c66ff2 100644
--- a/backend/python/test/fakeplugin/fakeplugin/main.py
+++ b/backend/python/test/fakeplugin/fakeplugin/main.py
@@ -13,34 +13,16 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from enum import Enum
-from datetime import datetime
-from typing import Optional
import json
+from datetime import datetime
-from pydantic import SecretStr
-
-from pydevlake import Plugin, Connection, Stream, ToolModel, ToolScope,
ScopeConfig, RemoteScopeGroup, DomainType, \
- Field, TestConnectionResult
+from fakeplugin.models import FakeConnection, FakeProject, FakeScopeConfig,
FakePipeline
+from pydevlake import Plugin, Stream, RemoteScopeGroup, DomainType,
TestConnectionResult
from pydevlake.domain_layer.devops import CicdScope, CICDPipeline, CICDStatus,
CICDResult, CICDType
-from pydevlake.migration import migration, MigrationScriptBuilder, Dialect
VALID_TOKEN = "this_is_a_valid_token"
-class FakePipeline(ToolModel, table=True):
- class State(Enum):
- PENDING = "pending"
- RUNNING = "running"
- FAILURE = "failure"
- SUCCESS = "success"
-
- id: str = Field(primary_key=True)
- started_at: Optional[datetime]
- finished_at: Optional[datetime]
- state: State
-
-
class FakePipelineStream(Stream):
tool_model = FakePipeline
domain_types = [DomainType.CICD]
@@ -94,18 +76,6 @@ class FakePipelineStream(Stream):
return fake_pipelines
-class FakeConnection(Connection):
- token: SecretStr
-
-
-class FakeProject(ToolScope, table=True):
- url: str
-
-
-class FakeScopeConfig(ScopeConfig):
- env: str
-
-
class FakePlugin(Plugin):
@property
def connection_type(self):
@@ -167,12 +137,7 @@ class FakePlugin(Plugin):
]
-# test migration
-@migration(20230630000001, name="populated _raw_data_table column for
fakeproject")
-def add_raw_data_params_table_to_scope(b: MigrationScriptBuilder):
- b.execute(f'UPDATE {FakeProject.__tablename__} SET _raw_data_table =
"_raw_fakeproject_scopes" WHERE 1=1', Dialect.MYSQL) #mysql only
- b.execute(f'''UPDATE {FakeProject.__tablename__} SET _raw_data_table =
'_raw_fakeproject_scopes' WHERE 1=1''', Dialect.POSTGRESQL) #mysql and postgres
-
+import fakeplugin.migrations # NEEDED
if __name__ == '__main__':
FakePlugin.start()
diff --git a/backend/python/test/fakeplugin/fakeplugin/migrations.py
b/backend/python/test/fakeplugin/fakeplugin/migrations.py
new file mode 100644
index 000000000..9e9439787
--- /dev/null
+++ b/backend/python/test/fakeplugin/fakeplugin/migrations.py
@@ -0,0 +1,29 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from fakeplugin.models import FakeConnection, FakePipeline, FakeProject,
FakeScopeConfig
+from pydevlake.migration import migration, MigrationScriptBuilder, Dialect
+
+
+@migration(20230530000001, name="init schemas")
+def init_schemas(b: MigrationScriptBuilder):
+ b.create_tables(FakeConnection, FakePipeline, FakeProject, FakeScopeConfig)
+
+
+# test migration
+@migration(20230630000001, name="populated _raw_data_table column for
fakeproject")
+def add_raw_data_params_table_to_scope(b: MigrationScriptBuilder):
+ b.execute(f'UPDATE {FakeProject.__tablename__} SET _raw_data_table =
"_raw_fakeproject_scopes" WHERE 1=1', Dialect.MYSQL) #mysql only
+ b.execute(f'''UPDATE {FakeProject.__tablename__} SET _raw_data_table =
'_raw_fakeproject_scopes' WHERE 1=1''', Dialect.POSTGRESQL) #mysql and postgres
\ No newline at end of file
diff --git a/backend/python/test/fakeplugin/fakeplugin/models.py
b/backend/python/test/fakeplugin/fakeplugin/models.py
new file mode 100644
index 000000000..16f543dd8
--- /dev/null
+++ b/backend/python/test/fakeplugin/fakeplugin/models.py
@@ -0,0 +1,47 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from datetime import datetime
+from enum import Enum
+from typing import Optional
+
+from pydantic import SecretStr
+
+from pydevlake import ScopeConfig, ToolScope, Connection, ToolModel, Field
+
+
+class FakeConnection(Connection):
+ token: SecretStr
+
+
+class FakeProject(ToolScope, table=True):
+ url: str
+
+
+class FakeScopeConfig(ScopeConfig):
+ env: str
+
+
+class FakePipeline(ToolModel, table=True):
+ class State(Enum):
+ PENDING = "pending"
+ RUNNING = "running"
+ FAILURE = "failure"
+ SUCCESS = "success"
+
+ id: str = Field(primary_key=True)
+ started_at: Optional[datetime]
+ finished_at: Optional[datetime]
+ state: State
diff --git a/backend/server/services/remote/init.go
b/backend/server/services/remote/init.go
index 4c6dd8ff1..5005b3cbc 100644
--- a/backend/server/services/remote/init.go
+++ b/backend/server/services/remote/init.go
@@ -43,10 +43,6 @@ func NewRemotePlugin(info *models.PluginInfo)
(models.RemotePlugin, errors.Error
if err != nil {
return nil, err
}
- err = plugin.RunAutoMigrations()
- if err != nil {
- return nil, err
- }
err = pluginCore.RegisterPlugin(info.Name, plugin)
if err != nil {
return nil, err
diff --git a/backend/server/services/remote/models/conversion.go
b/backend/server/services/remote/models/conversion.go
index 51c2e55e2..f5ec727ba 100644
--- a/backend/server/services/remote/models/conversion.go
+++ b/backend/server/services/remote/models/conversion.go
@@ -33,7 +33,11 @@ import (
)
func LoadTableModel(tableName string, schema utils.JsonObject, parentModel
any) (models.DynamicTabler, errors.Error) {
- structType, err := GenerateStructType(schema,
reflect.TypeOf(parentModel))
+ var baseType reflect.Type = nil
+ if parentModel != nil {
+ baseType = reflect.TypeOf(parentModel)
+ }
+ structType, err := GenerateStructType(schema, baseType)
if err != nil {
return nil, err
}
@@ -60,7 +64,7 @@ func GenerateStructType(schema utils.JsonObject, baseType
reflect.Type) (reflect
structFields = append(structFields, anonymousField)
}
for k, v := range props {
- if isBaseTypeField(k, baseType) {
+ if baseType != nil && isBaseTypeField(k, baseType) {
continue
}
spec := v.(utils.JsonObject)
@@ -136,6 +140,7 @@ var (
stringType = reflect.TypeOf("")
timeType = reflect.TypeOf(time.Time{})
jsonMapType = reflect.TypeOf(datatypes.JSONMap{})
+ jsonType = reflect.TypeOf(datatypes.JSON{})
)
func generateStructField(name string, schema utils.JsonObject, required bool)
(*reflect.StructField, errors.Error) {
@@ -156,11 +161,11 @@ func generateStructField(name string, schema
utils.JsonObject, required bool) (*
}
func getGoType(schema utils.JsonObject, required bool) (reflect.Type,
errors.Error) {
- jsonType, ok := schema["type"].(string)
+ rawType, ok := schema["type"].(string)
if !ok {
return nil, errors.BadInput.New("\"type\" property must be a
string")
}
- switch jsonType {
+ switch rawType {
//TODO: support more types
case "integer":
return int64Type, nil
@@ -179,10 +184,12 @@ func getGoType(schema utils.JsonObject, required bool)
(reflect.Type, errors.Err
} else {
return stringType, nil
}
+ case "array":
+ return jsonType, nil
case "object":
return jsonMapType, nil
default:
- return nil, errors.BadInput.New(fmt.Sprintf("Unsupported type
%s", jsonType))
+ return nil, errors.BadInput.New(fmt.Sprintf("Unsupported type
%s", rawType))
}
}
diff --git a/backend/server/services/remote/models/migration.go
b/backend/server/services/remote/models/migration.go
index 1862bf5da..c06b1d22f 100644
--- a/backend/server/services/remote/models/migration.go
+++ b/backend/server/services/remote/models/migration.go
@@ -19,15 +19,16 @@ package models
import (
"encoding/json"
-
"github.com/apache/incubator-devlake/core/context"
"github.com/apache/incubator-devlake/core/dal"
"github.com/apache/incubator-devlake/core/errors"
+ "github.com/apache/incubator-devlake/core/models/common"
"github.com/apache/incubator-devlake/core/plugin"
+ "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
)
type Operation interface {
- Execute(dal.Dal) errors.Error
+ Execute(basicRes context.BasicRes) errors.Error
}
type ExecuteOperation struct {
@@ -35,14 +36,15 @@ type ExecuteOperation struct {
Dialect *string `json:"dialect"`
}
-func (o ExecuteOperation) Execute(dal dal.Dal) errors.Error {
+func (o ExecuteOperation) Execute(basicRes context.BasicRes) errors.Error {
+ db := basicRes.GetDal()
if o.Dialect != nil {
- if dal.Dialect() == *o.Dialect {
- return dal.Exec(o.Sql)
+ if db.Dialect() == *o.Dialect {
+ return db.Exec(o.Sql)
}
return nil
} else {
- return dal.Exec(o.Sql)
+ return db.Exec(o.Sql)
}
}
@@ -54,14 +56,12 @@ type AddColumnOperation struct {
ColumnType dal.ColumnType `json:"column_type"`
}
-func (o AddColumnOperation) Execute(dal dal.Dal) errors.Error {
- if dal.HasColumn(o.Table, o.Column) {
- err := dal.DropColumns(o.Table, o.Column)
- if err != nil {
- return err
- }
+func (o AddColumnOperation) Execute(basicRes context.BasicRes) errors.Error {
+ db := basicRes.GetDal()
+ if db.HasColumn(o.Table, o.Column) {
+ return nil
}
- return dal.AddColumn(o.Table, o.Column, o.ColumnType)
+ return db.AddColumn(o.Table, o.Column, o.ColumnType)
}
type DropColumnOperation struct {
@@ -69,9 +69,10 @@ type DropColumnOperation struct {
Column string `json:"column"`
}
-func (o DropColumnOperation) Execute(dal dal.Dal) errors.Error {
- if dal.HasColumn(o.Table, o.Column) {
- return dal.DropColumns(o.Table, o.Column)
+func (o DropColumnOperation) Execute(basicRes context.BasicRes) errors.Error {
+ db := basicRes.GetDal()
+ if db.HasColumn(o.Table, o.Column) {
+ return db.DropColumns(o.Table, o.Column)
}
return nil
}
@@ -83,9 +84,10 @@ type DropTableOperation struct {
Column string `json:"column"`
}
-func (o DropTableOperation) Execute(dal dal.Dal) errors.Error {
- if dal.HasTable(o.Table) {
- return dal.DropTables(o.Table)
+func (o DropTableOperation) Execute(basicRes context.BasicRes) errors.Error {
+ db := basicRes.GetDal()
+ if db.HasTable(o.Table) {
+ return db.DropTables(o.Table)
}
return nil
}
@@ -97,17 +99,33 @@ type RenameTableOperation struct {
NewName string `json:"new_name"`
}
-func (o RenameTableOperation) Execute(dal dal.Dal) errors.Error {
- if !dal.HasTable(o.OldName) {
+func (o RenameTableOperation) Execute(basicRes context.BasicRes) errors.Error {
+ db := basicRes.GetDal()
+ if !db.HasTable(o.OldName) {
return nil
}
- if dal.HasTable(o.NewName) {
- err := dal.DropTables(o.NewName)
- if err != nil {
- return err
- }
+ return db.RenameTable(o.OldName, o.NewName)
+}
+
+type CreateTableOperation struct {
+ ModelInfo *DynamicModelInfo `json:"model_info"`
+}
+
+func (o CreateTableOperation) Execute(basicRes context.BasicRes) errors.Error {
+ db := basicRes.GetDal()
+ if db.HasTable(o.ModelInfo.TableName) {
+ basicRes.GetLogger().Warn(nil, "table %s already exists. It
won't be created.", o.ModelInfo.TableName)
+ return nil
}
- return dal.RenameTable(o.OldName, o.NewName)
+ model, err := o.ModelInfo.LoadDynamicTabler(common.NoPKModel{})
+ if err != nil {
+ return err
+ }
+ err = api.CallDB(db.AutoMigrate, model.New())
+ if err != nil {
+ return err
+ }
+ return nil
}
var _ Operation = (*RenameTableOperation)(nil)
@@ -135,7 +153,7 @@ func (s *RemoteMigrationScript) UnmarshalJSON(data []byte)
error {
s.operations = make([]Operation, len(rawScript.Operations))
for i, operationRaw := range rawScript.Operations {
operationMap := make(map[string]interface{})
- err := json.Unmarshal(operationRaw, &operationMap)
+ err = json.Unmarshal(operationRaw, &operationMap)
if err != nil {
return err
}
@@ -152,6 +170,8 @@ func (s *RemoteMigrationScript) UnmarshalJSON(data []byte)
error {
operation = &DropTableOperation{}
case "rename_table":
operation = &RenameTableOperation{}
+ case "create_table":
+ operation = &CreateTableOperation{}
default:
return errors.BadInput.New("unsupported operation type")
}
@@ -165,9 +185,8 @@ func (s *RemoteMigrationScript) UnmarshalJSON(data []byte)
error {
}
func (s *RemoteMigrationScript) Up(basicRes context.BasicRes) errors.Error {
- db := basicRes.GetDal()
for _, operation := range s.operations {
- err := operation.Execute(db)
+ err := operation.Execute(basicRes)
if err != nil {
return err
}
diff --git a/backend/server/services/remote/models/plugin_remote.go
b/backend/server/services/remote/models/plugin_remote.go
index 358d89a96..988283630 100644
--- a/backend/server/services/remote/models/plugin_remote.go
+++ b/backend/server/services/remote/models/plugin_remote.go
@@ -18,7 +18,6 @@ limitations under the License.
package models
import (
- "github.com/apache/incubator-devlake/core/errors"
"github.com/apache/incubator-devlake/core/plugin"
)
@@ -31,5 +30,4 @@ type RemotePlugin interface {
plugin.PluginModel
plugin.PluginMigration
plugin.PluginSource
- RunAutoMigrations() errors.Error
}
diff --git a/backend/server/services/remote/plugin/plugin_impl.go
b/backend/server/services/remote/plugin/plugin_impl.go
index 1e6c66d1c..7ced36f9d 100644
--- a/backend/server/services/remote/plugin/plugin_impl.go
+++ b/backend/server/services/remote/plugin/plugin_impl.go
@@ -230,36 +230,6 @@ func (p *remotePluginImpl) ApiResources()
map[string]map[string]plugin.ApiResour
return p.resources
}
-func (p *remotePluginImpl) RunAutoMigrations() errors.Error {
- err := p.createTable(p.connectionTabler.New())
- if err != nil {
- return err
- }
- err = p.createTable(p.scopeTabler.New())
- if err != nil {
- return err
- }
- err = p.createTable(p.scopeConfigTabler.New())
- if err != nil {
- return err
- }
- for _, toolModelTabler := range p.toolModelTablers {
- err = p.createTable(toolModelTabler.New())
- if err != nil {
- return err
- }
- }
- return nil
-}
-
-func (p *remotePluginImpl) createTable(tbl coreModels.DynamicTabler)
errors.Error {
- db := basicRes.GetDal()
- if db.HasTable(tbl.TableName()) {
- return nil
- }
- return api.CallDB(db.AutoMigrate, tbl)
-}
-
func (p *remotePluginImpl) OpenApiSpec() string {
return p.openApiSpec
}