This is an automated email from the ASF dual-hosted git repository.
hellostephen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 53795dbcf1b [feature](dbt) upgrade dbt-doris adapter to v1.0.0 (#61423)
53795dbcf1b is described below
commit 53795dbcf1bea68300488a2734aadc4b1d09b115
Author: catpineapple <[email protected]>
AuthorDate: Fri Mar 27 11:38:24 2026 +0800
[feature](dbt) upgrade dbt-doris adapter to v1.0.0 (#61423)
### What problem does this PR solve?
Upgrade dbt-doris version to 1.0.0
- Improve connection management: handle unknown database on first
connect
- Add cross-database relation support (database mapped to schema)
- Add transaction no-op for Doris (BEGIN/COMMIT not supported in mixed
DDL/DML)
- Add data_type_code_to_name mapping for mysql-connector type codes
- Improve view materialization with CREATE OR REPLACE VIEW
- Add table exchange for table materialization re-runs
- Add table/column comment support (persist_docs)
- Add comprehensive test suite: connection, cross-database, incremental,
seed, snapshot, table, and view tests
- Remove hardcoded test credentials, use environment variables with safe
defaults
---
.../dbt-doris/dbt/adapters/doris/__version__.py | 2 +-
.../dbt-doris/dbt/adapters/doris/connections.py | 104 +++++++--
extension/dbt-doris/dbt/adapters/doris/impl.py | 86 +++-----
extension/dbt-doris/dbt/adapters/doris/relation.py | 8 +-
.../dbt/include/doris/macros/adapters/columns.sql | 53 ++++-
.../dbt/include/doris/macros/adapters/metadata.sql | 8 +-
.../dbt/include/doris/macros/adapters/relation.sql | 8 +
.../doris/macros/materializations/table/table.sql | 13 +-
.../materializations/view/create_view_as.sql | 9 +-
.../doris/macros/materializations/view/view.sql | 49 +++--
.../dbt/include/doris/sample_profiles.yml | 13 +-
extension/dbt-doris/setup.py | 7 +-
extension/dbt-doris/test/conftest.py | 10 +-
.../__version__.py => test/functional/__init__.py} | 9 -
.../functional/adapter/__init__.py} | 9 -
.../test/functional/adapter/test_basic.py | 60 ++----
.../functional/adapter/test_doris_connection.py | 90 ++++++++
.../adapter/test_doris_cross_database.py | 232 +++++++++++++++++++++
.../functional/adapter/test_doris_incremental.py | 177 ++++++++++++++++
.../test/functional/adapter/test_doris_seed.py | 142 +++++++++++++
.../test/functional/adapter/test_doris_snapshot.py | 89 ++++++++
.../test/functional/adapter/test_doris_table.py | 186 +++++++++++++++++
.../test/functional/adapter/test_doris_view.py | 113 ++++++++++
23 files changed, 1285 insertions(+), 192 deletions(-)
diff --git a/extension/dbt-doris/dbt/adapters/doris/__version__.py
b/extension/dbt-doris/dbt/adapters/doris/__version__.py
index a86e6374ad4..41ecfd00d5c 100644
--- a/extension/dbt-doris/dbt/adapters/doris/__version__.py
+++ b/extension/dbt-doris/dbt/adapters/doris/__version__.py
@@ -22,4 +22,4 @@
# this 'version' must be set !!!
# otherwise the adapters will not be found after the 'dbt init xxx' command
-version = "0.4.0"
+version = "1.0.0"
diff --git a/extension/dbt-doris/dbt/adapters/doris/connections.py
b/extension/dbt-doris/dbt/adapters/doris/connections.py
index 7503210150e..4367fe65a3c 100644
--- a/extension/dbt-doris/dbt/adapters/doris/connections.py
+++ b/extension/dbt-doris/dbt/adapters/doris/connections.py
@@ -23,6 +23,7 @@ from dataclasses import dataclass
from typing import ContextManager, Optional, Union
import mysql.connector
+from mysql.connector.constants import FieldType
from dbt import exceptions
from dbt.adapters.contracts.connection import Credentials
@@ -48,7 +49,7 @@ class DorisCredentials(Credentials):
return "doris"
def _connection_keys(self):
- return "host", "port", "user", "schema"
+ return "host", "port", "username", "schema"
@property
def unique_field(self) -> str:
@@ -78,6 +79,7 @@ class DorisConnectionManager(SQLConnectionManager):
"port": credentials.port,
"user": credentials.username,
"password": credentials.password,
+ "database": credentials.schema,
"buffered": True,
"charset": "utf8",
"get_warnings": True,
@@ -86,16 +88,28 @@ class DorisConnectionManager(SQLConnectionManager):
try:
connection.handle = mysql.connector.connect(**kwargs)
connection.state = 'open'
- except mysql.connector.Error:
-
- try:
- logger.debug("Failed connection without supplying the
`database`. "
- "Trying again with `database` included.")
- connection.handle = mysql.connector.connect(**kwargs)
- connection.state = 'open'
- except mysql.connector.Error as e:
-
- logger.debug("Got an error when attempting to open a mysql "
+ except mysql.connector.Error as e:
+ # If the database does not exist yet, connect without it.
+ # dbt will create the database/schema via create_schema().
+ if e.errno == 1049: # Unknown database
+ logger.debug(
+ f"Database '{credentials.schema}' does not exist, "
+ "connecting without database."
+ )
+ kwargs.pop("database", None)
+ try:
+ connection.handle = mysql.connector.connect(**kwargs)
+ connection.state = 'open'
+ except mysql.connector.Error as e2:
+ logger.debug(
+ "Got an error when attempting to open a Doris "
+ "connection: '{}'".format(e2)
+ )
+ connection.handle = None
+ connection.state = 'fail'
+ raise exceptions.DbtRuntimeError(str(e2))
+ else:
+ logger.debug("Got an error when attempting to open a Doris "
"connection: '{}'"
.format(e))
@@ -109,7 +123,6 @@ class DorisConnectionManager(SQLConnectionManager):
def get_credentials(cls, credentials):
return credentials
- @classmethod
def cancel(self, connection: Connection):
connection.handle.close()
@@ -140,13 +153,72 @@ class DorisConnectionManager(SQLConnectionManager):
raise exceptions.DbtRuntimeError(str(e)) from e
@classmethod
+ def data_type_code_to_name(cls, type_code) -> str:
+ """Map mysql-connector type codes to Doris type names."""
+ mapping = {
+ FieldType.TINY: "TINYINT",
+ FieldType.SHORT: "SMALLINT",
+ FieldType.LONG: "INT",
+ FieldType.FLOAT: "FLOAT",
+ FieldType.DOUBLE: "DOUBLE",
+ FieldType.NULL: "NULL",
+ FieldType.TIMESTAMP: "DATETIME",
+ FieldType.LONGLONG: "BIGINT",
+ FieldType.INT24: "INT",
+ FieldType.DATE: "DATE",
+ FieldType.TIME: "TIME",
+ FieldType.DATETIME: "DATETIME",
+ FieldType.YEAR: "INT",
+ FieldType.NEWDATE: "DATE",
+ FieldType.VARCHAR: "VARCHAR",
+ FieldType.BIT: "BOOLEAN",
+ FieldType.JSON: "JSON",
+ FieldType.NEWDECIMAL: "DECIMAL",
+ FieldType.DECIMAL: "DECIMAL",
+ FieldType.ENUM: "VARCHAR",
+ FieldType.SET: "VARCHAR",
+ FieldType.TINY_BLOB: "STRING",
+ FieldType.MEDIUM_BLOB: "STRING",
+ FieldType.LONG_BLOB: "STRING",
+ FieldType.BLOB: "STRING",
+ FieldType.VAR_STRING: "VARCHAR",
+ FieldType.STRING: "STRING",
+ FieldType.GEOMETRY: "STRING",
+ }
+ return mapping.get(type_code, "STRING")
+
def begin(self):
"""
-
https://doris.apache.org/docs/data-operate/import/import-scenes/load-atomicity/
- Doris's inserting always transaction, ignore it
+ Doris BEGIN limitation: once BEGIN is issued, only
INSERT/UPDATE/DELETE/
+ COMMIT/ROLLBACK are allowed — SELECT and DDL will error with:
+ "This is in a transaction, only insert, update, delete, commit,
rollback
+ is acceptable."
+
+ We must NOT send literal BEGIN SQL. We only maintain dbt-core's
+ transaction_open flag so the framework tracks state correctly.
"""
- pass
+ connection = self.get_thread_connection()
+ if connection.transaction_open is True:
+ raise exceptions.DbtRuntimeError(
+ "Tried to begin a new transaction on connection '{}', but "
+ "it already had one open!".format(connection.name)
+ )
+ connection.transaction_open = True
+ return connection
- @classmethod
def commit(self):
+ """
+ Do not send literal COMMIT SQL — bare COMMIT without BEGIN is a no-op
+ in Doris, but we avoid it for clarity. Just reset the framework flag.
+ """
+ connection = self.get_thread_connection()
+ connection.transaction_open = False
+ return connection
+
+ def add_begin_query(self):
+ """Override to prevent literal 'BEGIN' SQL from being sent to Doris."""
+ pass
+
+ def add_commit_query(self):
+ """Override to prevent literal 'COMMIT' SQL from being sent to
Doris."""
pass
diff --git a/extension/dbt-doris/dbt/adapters/doris/impl.py
b/extension/dbt-doris/dbt/adapters/doris/impl.py
index 082a94f5ad3..9ea9b0ecb0d 100644
--- a/extension/dbt-doris/dbt/adapters/doris/impl.py
+++ b/extension/dbt-doris/dbt/adapters/doris/impl.py
@@ -20,36 +20,28 @@
from dbt.adapters.sql import SQLAdapter
-from concurrent.futures import Future
from enum import Enum
from typing import (
Any,
- Callable,
Dict,
+ FrozenSet,
Iterable,
- Iterator,
List,
- Mapping,
Optional,
Set,
Tuple,
- Type,
- Union,
)
import agate
import dbt.exceptions
-from dbt.adapters.base.impl import _expect_row_value, catch_as_completed
-from dbt.adapters.base.relation import InformationSchema, BaseRelation
+from dbt.adapters.base.relation import BaseRelation
from dbt.adapters.doris.column import DorisColumn
from dbt.adapters.doris.connections import DorisConnectionManager
from dbt.adapters.doris.relation import DorisRelation
from dbt.adapters.protocol import AdapterConfig
+from dbt.adapters.contracts.relation import RelationConfig, RelationType
from dbt.adapters.sql.impl import LIST_RELATIONS_MACRO_NAME,
LIST_SCHEMAS_MACRO_NAME
from dbt_common.clients.agate_helper import table_from_rows
-from dbt.contracts.graph.manifest import Manifest
-from dbt.adapters.contracts.relation import RelationType
-from dbt_common.utils import executor
from dbt.adapters.doris.doris_column_item import DorisColumnItem
@@ -139,60 +131,44 @@ class DorisAdapter(SQLAdapter):
return relations
- def get_catalog(self, manifest):
- schema_map = self._get_catalog_schemas(manifest)
-
- with executor(self.config) as tpe:
- futures: List[Future[agate.Table]] = []
- for info, schemas in schema_map.items():
- for schema in schemas:
- futures.append(
- tpe.submit_connected(
- self,
- schema,
- self._get_one_catalog,
- info,
- [schema],
- manifest,
- )
- )
- catalogs, exceptions = catch_as_completed(futures)
- return catalogs, exceptions
+ def _catalog_filter_table(
+ self, table: agate.Table, used_schemas: FrozenSet[Tuple[str, str]]
+ ) -> agate.Table:
+ table = table_from_rows(
+ table.rows,
+ table.column_names,
+ text_only_columns=["table_schema", "table_name"],
+ )
+ return table.where(self._catalog_filter_schemas(used_schemas))
- @classmethod
- def _catalog_filter_schemas(cls, manifest: Manifest) ->
Callable[[agate.Row], bool]:
- schemas = frozenset((None, s.lower()) for d, s in
manifest.get_used_schemas())
+ @staticmethod
+ def _catalog_filter_schemas(
+ used_schemas: FrozenSet[Tuple[str, str]]
+ ):
+ schemas = frozenset((None, s.lower()) for d, s in used_schemas)
- def _(row: agate.Row) -> bool:
- table_database = _expect_row_value("table_database", row)
- table_schema = _expect_row_value("table_schema", row)
+ def predicate(row: agate.Row) -> bool:
+ table_database = row.get("table_database")
+ table_schema = row.get("table_schema")
if table_schema is None:
return False
return (table_database, table_schema.lower()) in schemas
- return _
+ return predicate
@classmethod
- def _catalog_filter_table(cls, table: agate.Table, manifest: Manifest) ->
agate.Table:
- table = table_from_rows(
- table.rows,
- table.column_names,
- text_only_columns=["table_schema", "table_name"],
- )
- return table.where(cls._catalog_filter_schemas(manifest))
+ def convert_number_type(cls, agate_table: agate.Table, col_idx: int) ->
str:
+ decimals = agate_table.aggregate(agate.HasNulls(col_idx))
+ return "double" if decimals else "bigint"
- def _get_one_catalog(
- self,
- information_schema: InformationSchema,
- schemas: Set[str],
- manifest: Manifest,
- ) -> agate.Table:
- if len(schemas) != 1:
- dbt.exceptions.raise_compiler_error(
- f"Expected only one schema in Doris _get_one_catalog, found "
f"{schemas}"
- )
+ @classmethod
+ def convert_boolean_type(cls, agate_table: agate.Table, col_idx: int) ->
str:
+ return "boolean"
- return super()._get_one_catalog(information_schema, schemas, manifest)
+ def quote_seed_column(self, column: str, quote_config: Optional[bool]) ->
str:
+ if quote_config is None or quote_config:
+ return self.quote(column)
+ return column
# Methods used in adapter tests
def timestamp_add_sql(self, add_to: str, number: int = 1, interval: str =
"hour") -> str:
diff --git a/extension/dbt-doris/dbt/adapters/doris/relation.py
b/extension/dbt-doris/dbt/adapters/doris/relation.py
index 17956d1e400..797adc841a5 100644
--- a/extension/dbt-doris/dbt/adapters/doris/relation.py
+++ b/extension/dbt-doris/dbt/adapters/doris/relation.py
@@ -45,8 +45,12 @@ class DorisRelation(BaseRelation):
quote_character: str = "`"
def __post_init__(self):
- if self.database != self.schema and self.database:
- raise DbtRuntimeError(f"Cannot set database {self.database} in
Doris!")
+ # In Doris, database and schema are the same concept — there is only
+ # one namespace level. When a source or model sets "database" to a
+ # value that differs from "schema", treat database AS the schema so
+ # that cross-database references like {{ source(...) }} work correctly.
+ if self.database and self.database != self.schema:
+ self.path.schema = self.database
def render(self):
if self.include_policy.database and self.include_policy.schema:
diff --git a/extension/dbt-doris/dbt/include/doris/macros/adapters/columns.sql
b/extension/dbt-doris/dbt/include/doris/macros/adapters/columns.sql
index 42ecff07556..fd708926e70 100644
--- a/extension/dbt-doris/dbt/include/doris/macros/adapters/columns.sql
+++ b/extension/dbt-doris/dbt/include/doris/macros/adapters/columns.sql
@@ -17,21 +17,34 @@
{% macro doris__get_columns_in_relation(relation) -%}
{% call statement('get_columns_in_relation', fetch_result=True) %}
- select column_name as `column`,
- data_type as 'dtype',
- character_maximum_length as char_size,
- numeric_precision,
- numeric_scale
-from information_schema.columns
-where table_schema = '{{ relation.schema }}'
- and table_name = '{{ relation.identifier }}'
+ select column_name as `column`,
+ column_type as `dtype`,
+ character_maximum_length as char_size,
+ numeric_precision,
+ numeric_scale
+ from information_schema.columns
+ where table_schema = '{{ relation.schema }}'
+ and table_name = '{{ relation.identifier }}'
+ order by ordinal_position
{% endcall %}
{% set table = load_result('get_columns_in_relation').table %}
{{ return(sql_convert_columns_in_relation(table)) }}
{%- endmacro %}
-{% macro doris__alter_column_type(relation,column_name,new_column_type) -%}
-'''Changes column name or data type'''
+{% macro sql_convert_columns_in_relation(table) -%}
+ {% set columns = [] %}
+ {% for row in table %}
+ {% set col_name = row['column'] %}
+ {% set col_type = row['dtype'] %}
+ {% do columns.append(api.Column.create(col_name, col_type)) %}
+ {% endfor %}
+ {{ return(columns) }}
+{%- endmacro %}
+
+{% macro doris__alter_column_type(relation, column_name, new_column_type) -%}
+ {% call statement('alter_column_type') %}
+ alter table {{ relation }} modify column {{ column_name }} {{
new_column_type }}
+ {% endcall %}
{% endmacro %}
{% macro columns_and_constraints(table_type="table") %}
@@ -55,3 +68,23 @@ where table_schema = '{{ relation.schema }}'
{{ return(columns_and_constraints("view")) }}
{%- endmacro %}
+{% macro doris__alter_relation_comment(relation, relation_comment) -%}
+ {#-- Views do not support MODIFY COMMENT, only tables do --#}
+ {% if relation.type != 'view' %}
+ {% call statement('alter_relation_comment') %}
+ alter table {{ relation }} modify comment '{{ relation_comment }}'
+ {% endcall %}
+ {% endif %}
+{% endmacro %}
+
+{% macro doris__alter_column_comment(relation, column_dict) -%}
+ {#-- Views do not support MODIFY COLUMN COMMENT; column comments for views
+ are set at CREATE VIEW time via column definitions --#}
+ {% if relation.type != 'view' %}
+ {% for column_name, column_comment in column_dict.items() %}
+ {% call statement('alter_column_comment') %}
+ alter table {{ relation }} modify column `{{ column_name }}`
comment '{{ column_comment }}'
+ {% endcall %}
+ {% endfor %}
+ {% endif %}
+{% endmacro %}
diff --git a/extension/dbt-doris/dbt/include/doris/macros/adapters/metadata.sql
b/extension/dbt-doris/dbt/include/doris/macros/adapters/metadata.sql
index 9927c3d4fac..8119c70fa9d 100644
--- a/extension/dbt-doris/dbt/include/doris/macros/adapters/metadata.sql
+++ b/extension/dbt-doris/dbt/include/doris/macros/adapters/metadata.sql
@@ -41,7 +41,8 @@
when table_type = 'VIEW' then 'view'
else table_type
end as table_type,
- null as table_owner
+ null as table_owner,
+ table_comment
from information_schema.tables
),
columns as (
@@ -49,11 +50,10 @@
null as "table_database",
table_schema as "table_schema",
table_name as "table_name",
- null as "table_comment",
column_name as "column_name",
ordinal_position as "column_index",
data_type as "column_type",
- null as "column_comment"
+ column_comment as "column_comment"
from information_schema.columns
)
select
@@ -61,7 +61,7 @@
columns.table_schema,
columns.table_name,
tables.table_type,
- columns.table_comment,
+ tables.table_comment as "table_comment",
tables.table_owner,
columns.column_name,
columns.column_index,
diff --git a/extension/dbt-doris/dbt/include/doris/macros/adapters/relation.sql
b/extension/dbt-doris/dbt/include/doris/macros/adapters/relation.sql
index 3b2950de42e..537bd78d0a2 100644
--- a/extension/dbt-doris/dbt/include/doris/macros/adapters/relation.sql
+++ b/extension/dbt-doris/dbt/include/doris/macros/adapters/relation.sql
@@ -212,6 +212,14 @@
{% do return([false, new_relation]) %}
{% endmacro %}
+{% macro drop_relation_if_exists(relation) %}
+ {{ doris__drop_relation(relation) }}
+{% endmacro %}
+
+{% macro create_indexes(relation) -%}
+ {# Doris does not support traditional indexes; this is a no-op #}
+{%- endmacro %}
+
{% macro catalog_source(catalog,database,table) -%}
`{{catalog}}`.`{{database}}`.`{{table}}`
{%- endmacro %}
diff --git
a/extension/dbt-doris/dbt/include/doris/macros/materializations/table/table.sql
b/extension/dbt-doris/dbt/include/doris/macros/materializations/table/table.sql
index 051e54d4a95..204daf51aa5 100644
---
a/extension/dbt-doris/dbt/include/doris/macros/materializations/table/table.sql
+++
b/extension/dbt-doris/dbt/include/doris/macros/materializations/table/table.sql
@@ -28,9 +28,14 @@
-- drop the temp relations if they exist already in the database
{{ doris__drop_relation(preexisting_intermediate_relation) }}
+ {{ run_hooks(pre_hooks, inside_transaction=False) }}
+
+ -- `BEGIN` happens here:
+ {{ run_hooks(pre_hooks, inside_transaction=True) }}
+
-- build model
{% call statement('main') -%}
- {{ get_create_table_as_sql(False, intermediate_relation, sql) }}
+ {{ doris__create_table_as(False, intermediate_relation, sql) }}
{%- endcall %}
{% if existing_relation -%}
@@ -39,6 +44,7 @@
{{ adapter.rename_relation(intermediate_relation, target_relation) }}
{% endif %}
+ {{ run_hooks(post_hooks, inside_transaction=True) }}
{% set should_revoke = should_revoke(existing_relation,
full_refresh_mode=True) %}
{% do apply_grants(target_relation, grant_config,
should_revoke=should_revoke) %}
@@ -46,8 +52,13 @@
-- alter relation comment
{% do persist_docs(target_relation, model) %}
+ -- `COMMIT` happens here
+ {% do adapter.commit() %}
+
-- finally, drop the existing/backup relation after the commit
{{ doris__drop_relation(intermediate_relation) }}
+ {{ run_hooks(post_hooks, inside_transaction=False) }}
+
{{ return({'relations': [target_relation]}) }}
{% endmaterialization %}
diff --git
a/extension/dbt-doris/dbt/include/doris/macros/materializations/view/create_view_as.sql
b/extension/dbt-doris/dbt/include/doris/macros/materializations/view/create_view_as.sql
index 8bb845ed7e2..b9306b57be0 100644
---
a/extension/dbt-doris/dbt/include/doris/macros/materializations/view/create_view_as.sql
+++
b/extension/dbt-doris/dbt/include/doris/macros/materializations/view/create_view_as.sql
@@ -19,12 +19,5 @@
{%- set sql_header = config.get('sql_header', none) -%}
{{ sql_header if sql_header is not none }}
- create View {{ relation }} {{doris__view_colume_comment()}} as {{ sql }};
+ create or replace view {{ relation }} as {{ sql }};
{%- endmacro %}
-
-{% macro doris__view_colume_comment() -%}
- {% set cols = model.get('columns') %}
- {% if cols %}
- ( {{doris__get_view_columns_comment()}} )
- {%- endif -%}
-{%- endmacro %}
\ No newline at end of file
diff --git
a/extension/dbt-doris/dbt/include/doris/macros/materializations/view/view.sql
b/extension/dbt-doris/dbt/include/doris/macros/materializations/view/view.sql
index fc0baa2bee3..b205a6a3c8b 100644
---
a/extension/dbt-doris/dbt/include/doris/macros/materializations/view/view.sql
+++
b/extension/dbt-doris/dbt/include/doris/macros/materializations/view/view.sql
@@ -19,30 +19,35 @@
{%- set existing_relation = load_cached_relation(this) -%}
{%- set target_relation = this.incorporate(type='view') -%}
- {%- set intermediate_relation = make_intermediate_relation(target_relation)
-%}
- {%- set preexisting_intermediate_relation =
load_cached_relation(intermediate_relation) -%}
-
-
- {{ drop_relation_if_exists(intermediate_relation) }}
-
-
- {% if existing_relation is not none %}
- --todo: exchange
- {% call statement('main_test') -%}
- {{ get_create_view_as_sql(intermediate_relation, sql) }}
- {%- endcall %}
- {{ drop_relation_if_exists(intermediate_relation) }}
- {{ drop_relation_if_exists(target_relation) }}
- {% call statement('main') -%}
- {{ get_create_view_as_sql(target_relation, sql) }}
- {%- endcall %}
- {# {{ adapter.rename_relation(intermediate_relation, target_relation) }} #}
- {% else %}
- {% call statement('main') -%}
- {{ get_create_view_as_sql(target_relation, sql) }}
- {%- endcall %}
+
+ -- grab current tables grants config for comparision later on
+ {% set grant_config = config.get('grants') %}
+
+ {{ run_hooks(pre_hooks, inside_transaction=False) }}
+
+ -- `BEGIN` happens here:
+ {{ run_hooks(pre_hooks, inside_transaction=True) }}
+
+ {% if existing_relation is not none and existing_relation.type != 'view' %}
+ {{ doris__drop_relation(existing_relation) }}
{% endif %}
+ -- build model
+ {% call statement('main') -%}
+ {{ doris__create_view_as(target_relation, sql) }}
+ {%- endcall %}
+
+ {% set should_revoke = should_revoke(existing_relation,
full_refresh_mode=True) %}
+ {% do apply_grants(target_relation, grant_config,
should_revoke=should_revoke) %}
+
+ {% do persist_docs(target_relation, model) %}
+
+ {{ run_hooks(post_hooks, inside_transaction=True) }}
+
+ -- `COMMIT` happens here
+ {% do adapter.commit() %}
+
+ {{ run_hooks(post_hooks, inside_transaction=False) }}
{{ return({'relations': [target_relation]}) }}
diff --git a/extension/dbt-doris/dbt/include/doris/sample_profiles.yml
b/extension/dbt-doris/dbt/include/doris/sample_profiles.yml
index 6532a195bdd..a6c644b4b6a 100644
--- a/extension/dbt-doris/dbt/include/doris/sample_profiles.yml
+++ b/extension/dbt-doris/dbt/include/doris/sample_profiles.yml
@@ -1,6 +1,3 @@
-#!/usr/bin/env python
-# encoding: utf-8
-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -22,9 +19,9 @@ default:
outputs:
dev:
type: doris
- host: <host>
- port: <port_num>
- username: <user>
- password: <pass>
- database: <db>
+ host: <host>
+ port: <port_num>
+ username: <user>
+ password: <pass>
+ schema: <db>
target: dev
diff --git a/extension/dbt-doris/setup.py b/extension/dbt-doris/setup.py
index dc02948af7e..88e783184a1 100644
--- a/extension/dbt-doris/setup.py
+++ b/extension/dbt-doris/setup.py
@@ -22,8 +22,8 @@ from setuptools import find_namespace_packages, setup
package_name = "dbt-doris"
# make sure this always matches dbt/adapters/{adapter}/__version__.py
-package_version = "0.4.0"
-dbt_core_version = "1.8.0"
+package_version = "1.0.0"
+dbt_core_version = "1.10.4"
description = """The doris adapter plugin for dbt """
setup(
@@ -38,8 +38,7 @@ setup(
include_package_data=True,
install_requires=[
"dbt-core>={}".format(dbt_core_version),
- "mysql-connector-python>=8.0.0,<8.3",
- "urllib3~=1.0",
+ "mysql-connector-python>=8.0.0",
],
python_requires=">=3.9",
)
diff --git a/extension/dbt-doris/test/conftest.py
b/extension/dbt-doris/test/conftest.py
index e12af32eadf..d1baf197579 100644
--- a/extension/dbt-doris/test/conftest.py
+++ b/extension/dbt-doris/test/conftest.py
@@ -21,9 +21,8 @@
import pytest
import os
-import json
-# Import the fuctional fixtures as a plugin
+# Import the functional fixtures as a plugin
# Note: fixtures with session scope need to be local
pytest_plugins = ["dbt.tests.fixtures.project"]
@@ -32,11 +31,12 @@ pytest_plugins = ["dbt.tests.fixtures.project"]
# The profile dictionary, used to write out profiles.yml
@pytest.fixture(scope="class")
def dbt_profile_target():
- return {
+ return {
"type": "doris",
"threads": 1,
"host": os.getenv("DORIS_TEST_HOST", "127.0.0.1"),
- "user": os.getenv("DORIS_TEST_USER", "root"),
+ "port": int(os.getenv("DORIS_TEST_PORT", 9030)),
+ "username": os.getenv("DORIS_TEST_USER", "root"),
"password": os.getenv("DORIS_TEST_PASSWORD", ""),
- "port": os.getenv("DORIS_TEST_PORT", 9030),
+ "schema": os.getenv("DORIS_TEST_SCHEMA", "dbt_test"),
}
diff --git a/extension/dbt-doris/dbt/adapters/doris/__version__.py
b/extension/dbt-doris/test/functional/__init__.py
similarity index 82%
copy from extension/dbt-doris/dbt/adapters/doris/__version__.py
copy to extension/dbt-doris/test/functional/__init__.py
index a86e6374ad4..13a83393a91 100644
--- a/extension/dbt-doris/dbt/adapters/doris/__version__.py
+++ b/extension/dbt-doris/test/functional/__init__.py
@@ -1,6 +1,3 @@
-#!/usr/bin/env python
-# encoding: utf-8
-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -17,9 +14,3 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-
-
-# this 'version' must be set !!!
-# otherwise the adapters will not be found after the 'dbt init xxx' command
-
-version = "0.4.0"
diff --git a/extension/dbt-doris/dbt/adapters/doris/__version__.py
b/extension/dbt-doris/test/functional/adapter/__init__.py
similarity index 82%
copy from extension/dbt-doris/dbt/adapters/doris/__version__.py
copy to extension/dbt-doris/test/functional/adapter/__init__.py
index a86e6374ad4..13a83393a91 100644
--- a/extension/dbt-doris/dbt/adapters/doris/__version__.py
+++ b/extension/dbt-doris/test/functional/adapter/__init__.py
@@ -1,6 +1,3 @@
-#!/usr/bin/env python
-# encoding: utf-8
-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -17,9 +14,3 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-
-
-# this 'version' must be set !!!
-# otherwise the adapters will not be found after the 'dbt init xxx' command
-
-version = "0.4.0"
diff --git a/extension/dbt-doris/test/functional/adapter/test_basic.py
b/extension/dbt-doris/test/functional/adapter/test_basic.py
index 2f6ee1d1c4b..53709574365 100644
--- a/extension/dbt-doris/test/functional/adapter/test_basic.py
+++ b/extension/dbt-doris/test/functional/adapter/test_basic.py
@@ -25,24 +25,27 @@ from dbt.tests.adapter.basic.test_singular_tests import
BaseSingularTests
from dbt.tests.adapter.basic.test_singular_tests_ephemeral import (
BaseSingularTestsEphemeral
)
-from dbt.tests.util import run_dbt,
check_relations_equal,check_result_nodes_by_name,relation_from_name,check_relation_types
+from dbt.tests.util import (
+ run_dbt,
+ check_relations_equal,
+ check_result_nodes_by_name,
+ relation_from_name,
+ check_relation_types,
+)
from dbt.tests.adapter.basic.test_empty import BaseEmpty
from dbt.tests.adapter.basic.test_ephemeral import BaseEphemeral
-from dbt.tests.adapter.basic.test_incremental import BaseIncremental
from dbt.tests.adapter.basic.test_generic_tests import BaseGenericTests
-from dbt.tests.adapter.basic.test_snapshot_check_cols import
BaseSnapshotCheckCols
-from dbt.tests.adapter.basic.test_snapshot_timestamp import
BaseSnapshotTimestamp
from dbt.tests.adapter.basic.test_adapter_methods import BaseAdapterMethod
-class TestSimpleMaterializationsdoris(BaseSimpleMaterializations):
+class TestSimpleMaterializationsDoris(BaseSimpleMaterializations):
def test_base(self, project):
results = run_dbt(["seed"])
assert len(results) == 1
-
+
results = run_dbt()
assert len(results) == 3
-
+
check_result_nodes_by_name(results, ["view_model", "table_model",
"swappable"])
expected = {
"base": "table",
@@ -51,24 +54,27 @@ class
TestSimpleMaterializationsdoris(BaseSimpleMaterializations):
"swappable": "table",
}
check_relation_types(project.adapter, expected)
-
+
relation = relation_from_name(project.adapter, "base")
result = project.run_sql(f"select count(*) as num_rows from
{relation}", fetch="one")
assert result[0] == 10
-
+
check_relations_equal(project.adapter, ["base", "view_model",
"table_model", "swappable"])
-
-class TestSingularTestsdoris(BaseSingularTests):
+
+class TestSingularTestsDoris(BaseSingularTests):
pass
-class TestSingularTestsEphemeraldoris(BaseSingularTestsEphemeral):
+
+class TestSingularTestsEphemeralDoris(BaseSingularTestsEphemeral):
pass
-class TestEmptydoris(BaseEmpty):
+
+class TestEmptyDoris(BaseEmpty):
pass
-class TestEphemeraldoris(BaseEphemeral):
+
+class TestEphemeralDoris(BaseEphemeral):
def test_ephemeral(self, project):
results = run_dbt(["seed"])
assert len(results) == 1
@@ -80,35 +86,13 @@ class TestEphemeraldoris(BaseEphemeral):
result = project.run_sql(f"select count(*) as num_rows from
{relation}", fetch="one")
assert result[0] == 10
check_relations_equal(project.adapter, ["base", "view_model",
"table_model"])
-
[email protected](reason="Incremental for doris table model bust be 'unique' ")
-class TestIncrementaldoris(BaseIncremental):
- def test_incremental(self, project):
- results = run_dbt(["seed"])
- assert len(results) == 2
- relation = relation_from_name(project.adapter, "base")
- result = project.run_sql(f"select count(*) as num_rows from
{relation}", fetch="one")
- assert result[0] == 10
- relation = relation_from_name(project.adapter, "added")
- result = project.run_sql(f"select count(*) as num_rows from
{relation}", fetch="one")
- assert result[0] == 20
- results = run_dbt(["run", "--vars", "seed_name: base"])
- assert len(results) == 1
- check_relations_equal(project.adapter, ["base", "incremental"])
-
-class TestGenericTestsdoris(BaseGenericTests):
- pass
[email protected](reason="Snapshot for doris table model bust be 'unique'")
-class TestSnapshotCheckColsdoris(BaseSnapshotCheckCols):
+class TestGenericTestsDoris(BaseGenericTests):
pass
[email protected](reason="Snapshot for doris table model bust be 'unique'")
-class TestSnapshotTimestampdoris(BaseSnapshotTimestamp):
- pass
-class TestBaseAdapterMethoddoris(BaseAdapterMethod):
+class TestBaseAdapterMethodDoris(BaseAdapterMethod):
def test_adapter_methods(self, project, equal_tables):
result = run_dbt()
assert len(result) == 3
diff --git
a/extension/dbt-doris/test/functional/adapter/test_doris_connection.py
b/extension/dbt-doris/test/functional/adapter/test_doris_connection.py
new file mode 100644
index 00000000000..cadbc274723
--- /dev/null
+++ b/extension/dbt-doris/test/functional/adapter/test_doris_connection.py
@@ -0,0 +1,90 @@
+#!/usr/bin/env python
+# encoding: utf-8
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Tests for Doris connection management: dbt debug, schema operations,
+catalog generation, and multiple data types.
+"""
+
+import pytest
+from dbt.tests.util import run_dbt, relation_from_name
+
+
+MULTI_TYPE_MODEL_SQL = """
+{{ config(
+ materialized='table',
+ distributed_by=['id'],
+ properties={'replication_num': '1'}
+) }}
+
+select cast(1 as int) as id,
+ cast('hello' as varchar(50)) as str_col,
+ cast(3.14 as decimal(10,2)) as dec_col,
+ cast(true as boolean) as bool_col,
+ cast('2024-01-01' as date) as date_col,
+ cast('2024-01-01 12:00:00' as datetime) as datetime_col,
+ cast(100 as bigint) as bigint_col
+"""
+
+
+class TestDorisDebug:
+ """Test that dbt debug can connect to Doris."""
+
+ @pytest.fixture(scope="class")
+ def models(self):
+ return {"multi_type.sql": MULTI_TYPE_MODEL_SQL}
+
+ def test_debug(self, project):
+ results = run_dbt(["debug"])
+ assert results is None or True
+
+
+class TestDorisMultiType:
+ """Test various Doris column types via CTAS."""
+
+ @pytest.fixture(scope="class")
+ def models(self):
+ return {"multi_type.sql": MULTI_TYPE_MODEL_SQL}
+
+ def test_multi_type(self, project):
+ run_dbt(["run"])
+
+ relation = relation_from_name(project.adapter, "multi_type")
+ result = project.run_sql(
+ f"select id, str_col, dec_col, bool_col, date_col, bigint_col from
{relation}",
+ fetch="one",
+ )
+ assert result[0] == 1
+ assert result[1] == "hello"
+ assert result[4].isoformat() == "2024-01-01"
+ assert result[5] == 100
+
+
+class TestDorisCatalog:
+ """Test catalog generation (dbt docs generate)."""
+
+ @pytest.fixture(scope="class")
+ def models(self):
+ return {"multi_type.sql": MULTI_TYPE_MODEL_SQL}
+
+ def test_catalog(self, project):
+ run_dbt(["run"])
+ results = run_dbt(["docs", "generate"])
+ assert results is not None
diff --git
a/extension/dbt-doris/test/functional/adapter/test_doris_cross_database.py
b/extension/dbt-doris/test/functional/adapter/test_doris_cross_database.py
new file mode 100644
index 00000000000..49043e8d981
--- /dev/null
+++ b/extension/dbt-doris/test/functional/adapter/test_doris_cross_database.py
@@ -0,0 +1,232 @@
+#!/usr/bin/env python
+# encoding: utf-8
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Tests for Doris cross-database (database != schema) relation handling.
+
+Doris has only one namespace level (database = schema). When a source or model
+sets "database" to a different value, the adapter must NOT crash and should map
+the database value to schema for correct SQL rendering.
+"""
+
+import pytest
+from dbt.tests.util import run_dbt
+from dbt.adapters.doris.relation import DorisRelation
+
+
+# ---------------------------------------------------------------------------
+# Unit tests — no database connection needed
+# ---------------------------------------------------------------------------
+
+class TestDorisRelationDatabaseMapping:
+ """Verify DorisRelation maps database → schema without raising errors."""
+
+ def test_database_equals_schema(self):
+ """database == schema should work as before."""
+ rel = DorisRelation.create(
+ database="my_db",
+ schema="my_db",
+ identifier="my_table",
+ )
+ assert rel.schema == "my_db"
+ assert rel.identifier == "my_table"
+
+ def test_database_differs_from_schema(self):
+ """database != schema should NOT raise; database is used as schema."""
+ rel = DorisRelation.create(
+ database="other_db",
+ schema="default_schema",
+ identifier="my_table",
+ )
+ # database value should overwrite schema
+ assert rel.schema == "other_db"
+ assert rel.identifier == "my_table"
+
+ def test_database_none(self):
+ """database=None should leave schema unchanged."""
+ rel = DorisRelation.create(
+ database=None,
+ schema="my_schema",
+ identifier="my_table",
+ )
+ assert rel.schema == "my_schema"
+
+ def test_database_empty_string(self):
+ """database='' (falsy) should leave schema unchanged."""
+ rel = DorisRelation.create(
+ database="",
+ schema="my_schema",
+ identifier="my_table",
+ )
+ assert rel.schema == "my_schema"
+
+ def test_render_excludes_database(self):
+ """Rendered SQL should only include schema.identifier, never
database."""
+ rel = DorisRelation.create(
+ database="cross_db",
+ schema="original_schema",
+ identifier="orders",
+ )
+ rendered = rel.render()
+ # schema was mapped to "cross_db"
+ assert "cross_db" in rendered
+ assert "orders" in rendered
+ # Should be schema.identifier format, no three-part name
+ assert rendered.count(".") == 1
+
+ def test_render_without_database(self):
+ """Normal case: database=None renders schema.identifier."""
+ rel = DorisRelation.create(
+ database=None,
+ schema="analytics",
+ identifier="users",
+ )
+ rendered = rel.render()
+ assert "analytics" in rendered
+ assert "users" in rendered
+
+
+# ---------------------------------------------------------------------------
+# Integration tests — require a running Doris instance
+# ---------------------------------------------------------------------------
+
+# A source table created in a DIFFERENT database to verify cross-db access.
+# The test will: 1) create database cross_db_test, 2) create a table in it,
+# 3) define a dbt source pointing to that database, 4) build a model that
+# reads from it.
+
+CROSS_DB_SOURCE_YML = """
+version: 2
+
+sources:
+ - name: cross_db_src
+ database: cross_db_test
+ schema: cross_db_test
+ tables:
+ - name: remote_table
+"""
+
+CROSS_DB_MODEL_SQL = """
+{{ config(
+ materialized='table',
+ distributed_by=['id'],
+ properties={'replication_num': '1'}
+) }}
+
+select id, val from {{ source('cross_db_src', 'remote_table') }}
+"""
+
+# A source defined with ONLY database set (no explicit schema)
+# Should still work — database is mapped to schema.
+DB_ONLY_SOURCE_YML = """
+version: 2
+
+sources:
+ - name: db_only_src
+ database: cross_db_test
+ tables:
+ - name: remote_table
+"""
+
+DB_ONLY_MODEL_SQL = """
+{{ config(
+ materialized='view'
+) }}
+
+select id, val from {{ source('db_only_src', 'remote_table') }}
+"""
+
+
+class TestDorisCrossDatabaseSource:
+ """End-to-end: model reads from a source in a different Doris database."""
+
+ @pytest.fixture(scope="class")
+ def models(self):
+ return {
+ "cross_db_model.sql": CROSS_DB_MODEL_SQL,
+ "sources.yml": CROSS_DB_SOURCE_YML,
+ }
+
+ @pytest.fixture(scope="class", autouse=True)
+ def setup_cross_db(self, project):
+ """Create the remote database and table before tests, clean up
after."""
+ project.run_sql("CREATE DATABASE IF NOT EXISTS cross_db_test")
+ project.run_sql(
+ "CREATE TABLE IF NOT EXISTS cross_db_test.remote_table "
+ "(id INT, val VARCHAR(50)) "
+ "DISTRIBUTED BY HASH(id) BUCKETS 1 "
+ "PROPERTIES('replication_num' = '1')"
+ )
+ project.run_sql(
+ "INSERT INTO cross_db_test.remote_table VALUES (1, 'hello'), (2,
'world')"
+ )
+ yield
+ project.run_sql("DROP TABLE IF EXISTS cross_db_test.remote_table")
+ project.run_sql("DROP DATABASE IF EXISTS cross_db_test")
+
+ def test_cross_database_source(self, project):
+ results = run_dbt(["run"])
+ assert len(results) == 1
+
+ result = project.run_sql(
+ "select count(*) from cross_db_model", fetch="one"
+ )
+ assert result[0] == 2
+
+ result = project.run_sql(
+ "select val from cross_db_model where id = 1", fetch="one"
+ )
+ assert result[0] == "hello"
+
+
+class TestDorisDatabaseOnlySource:
+ """Source defined with database but no explicit schema — should map
correctly."""
+
+ @pytest.fixture(scope="class")
+ def models(self):
+ return {
+ "db_only_model.sql": DB_ONLY_MODEL_SQL,
+ "sources.yml": DB_ONLY_SOURCE_YML,
+ }
+
+ @pytest.fixture(scope="class", autouse=True)
+ def setup_cross_db(self, project):
+ project.run_sql("CREATE DATABASE IF NOT EXISTS cross_db_test")
+ project.run_sql(
+ "CREATE TABLE IF NOT EXISTS cross_db_test.remote_table "
+ "(id INT, val VARCHAR(50)) "
+ "DISTRIBUTED BY HASH(id) BUCKETS 1 "
+ "PROPERTIES('replication_num' = '1')"
+ )
+ project.run_sql(
+ "INSERT INTO cross_db_test.remote_table VALUES (1, 'aaa'), (2,
'bbb')"
+ )
+ yield
+ project.run_sql("DROP TABLE IF EXISTS cross_db_test.remote_table")
+ project.run_sql("DROP DATABASE IF EXISTS cross_db_test")
+
+ def test_database_only_source(self, project):
+ results = run_dbt(["run"])
+ assert len(results) == 1
+
+ result = project.run_sql(
+ "select count(*) from db_only_model", fetch="one"
+ )
+ assert result[0] == 2
diff --git
a/extension/dbt-doris/test/functional/adapter/test_doris_incremental.py
b/extension/dbt-doris/test/functional/adapter/test_doris_incremental.py
new file mode 100644
index 00000000000..7b88b62f8ff
--- /dev/null
+++ b/extension/dbt-doris/test/functional/adapter/test_doris_incremental.py
@@ -0,0 +1,177 @@
+#!/usr/bin/env python
+# encoding: utf-8
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Tests for Doris incremental materialization:
+- append strategy (duplicate key table)
+- insert_overwrite strategy (unique key table)
+- full refresh mode
+"""
+
+import pytest
+from dbt.tests.util import run_dbt, relation_from_name
+
+
+# -- Append strategy: works with duplicate key tables --
+
+INCREMENTAL_APPEND_SQL = """
+{{ config(
+ materialized='incremental',
+ incremental_strategy='append',
+ distributed_by=['id'],
+ properties={'replication_num': '1'}
+) }}
+
+{% if is_incremental() %}
+select 4 as id, 'dave' as name
+union all
+select 5 as id, 'eve' as name
+{% else %}
+select 1 as id, 'alice' as name
+union all
+select 2 as id, 'bob' as name
+union all
+select 3 as id, 'charlie' as name
+{% endif %}
+"""
+
+
+# -- Insert overwrite strategy: works with unique key tables --
+
+INCREMENTAL_UNIQUE_SQL = """
+{{ config(
+ materialized='incremental',
+ incremental_strategy='insert_overwrite',
+ unique_key=['id'],
+ distributed_by=['id'],
+ properties={'replication_num': '1'}
+) }}
+
+{% if is_incremental() %}
+select 1 as id, 'alice_updated' as name, 150 as score
+union all
+select 4 as id, 'dave' as name, 400 as score
+{% else %}
+select 1 as id, 'alice' as name, 100 as score
+union all
+select 2 as id, 'bob' as name, 200 as score
+union all
+select 3 as id, 'charlie' as name, 300 as score
+{% endif %}
+"""
+
+
+# -- Full refresh --
+
+INCREMENTAL_FULL_REFRESH_SQL = """
+{{ config(
+ materialized='incremental',
+ incremental_strategy='append',
+ distributed_by=['id'],
+ properties={'replication_num': '1'}
+) }}
+
+select 1 as id, 'only_row' as name
+"""
+
+
+class TestDorisIncrementalAppend:
+ @pytest.fixture(scope="class")
+ def models(self):
+ return {"incremental_append.sql": INCREMENTAL_APPEND_SQL}
+
+ def test_incremental_append(self, project):
+ # First run: creates table with 3 rows
+ results = run_dbt(["run"])
+ assert len(results) == 1
+
+ relation = relation_from_name(project.adapter, "incremental_append")
+ result = project.run_sql(f"select count(*) from {relation}",
fetch="one")
+ assert result[0] == 3
+
+ # Second run: appends 2 more rows
+ results = run_dbt(["run"])
+ assert len(results) == 1
+
+ result = project.run_sql(f"select count(*) from {relation}",
fetch="one")
+ assert result[0] == 5
+
+
+class TestDorisIncrementalUniqueKey:
+ @pytest.fixture(scope="class")
+ def models(self):
+ return {"incremental_unique.sql": INCREMENTAL_UNIQUE_SQL}
+
+ def test_incremental_unique_key(self, project):
+ # First run: creates unique key table with 3 rows
+ results = run_dbt(["run"])
+ assert len(results) == 1
+
+ relation = relation_from_name(project.adapter, "incremental_unique")
+ result = project.run_sql(f"select count(*) from {relation}",
fetch="one")
+ assert result[0] == 3
+
+ # Second run: upserts 2 rows (1 update + 1 new)
+ results = run_dbt(["run"])
+ assert len(results) == 1
+
+ result = project.run_sql(f"select count(*) from {relation}",
fetch="one")
+ assert result[0] == 4
+
+ # Verify id=1 was updated
+ result = project.run_sql(
+ f"select name, score from {relation} where id = 1", fetch="one"
+ )
+ assert result[0] == "alice_updated"
+ assert result[1] == 150
+
+ # Verify id=2 remains unchanged
+ result = project.run_sql(
+ f"select name from {relation} where id = 2", fetch="one"
+ )
+ assert result[0] == "bob"
+
+
+class TestDorisIncrementalFullRefresh:
+ @pytest.fixture(scope="class")
+ def models(self):
+ return {"incremental_fr.sql": INCREMENTAL_FULL_REFRESH_SQL}
+
+ def test_full_refresh(self, project):
+ # First run
+ results = run_dbt(["run"])
+ assert len(results) == 1
+
+ relation = relation_from_name(project.adapter, "incremental_fr")
+ result = project.run_sql(f"select count(*) from {relation}",
fetch="one")
+ assert result[0] == 1
+
+ # Second run: normal incremental (appends same row)
+ results = run_dbt(["run"])
+ assert len(results) == 1
+ result = project.run_sql(f"select count(*) from {relation}",
fetch="one")
+ assert result[0] == 2
+
+ # Full refresh: should reset to 1 row
+ results = run_dbt(["run", "--full-refresh"])
+ assert len(results) == 1
+
+ result = project.run_sql(f"select count(*) from {relation}",
fetch="one")
+ assert result[0] == 1
diff --git a/extension/dbt-doris/test/functional/adapter/test_doris_seed.py
b/extension/dbt-doris/test/functional/adapter/test_doris_seed.py
new file mode 100644
index 00000000000..159df7aca0f
--- /dev/null
+++ b/extension/dbt-doris/test/functional/adapter/test_doris_seed.py
@@ -0,0 +1,142 @@
+#!/usr/bin/env python
+# encoding: utf-8
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Tests for Doris seed functionality: CSV loading with type inference
+and custom column types.
+"""
+
+import pytest
+from dbt.tests.util import run_dbt, relation_from_name
+
+
+SEED_CSV = """id,name,score,active
+1,alice,100,true
+2,bob,200,false
+3,charlie,300,true
+"""
+
+SEED_WITH_TYPES_YML = """
+seeds:
+ - name: seed_typed
+ config:
+ column_types:
+ id: int
+ name: varchar(100)
+ score: bigint
+ active: boolean
+"""
+
+SEED_TYPED_CSV = """id,name,score,active
+1,alice,100,true
+2,bob,200,false
+"""
+
+MODEL_FROM_SEED_SQL = """
+{{ config(
+ materialized='table',
+ distributed_by=['id'],
+ properties={'replication_num': '1'}
+) }}
+
+select * from {{ ref('seed_basic') }}
+"""
+
+
+class TestDorisSeedBasic:
+ @pytest.fixture(scope="class")
+ def seeds(self):
+ return {"seed_basic.csv": SEED_CSV}
+
+ @pytest.fixture(scope="class")
+ def project_config_update(self):
+ return {"seeds": {"+properties": {"replication_num": "1"}}}
+
+ @pytest.fixture(scope="class")
+ def models(self):
+ return {}
+
+ def test_seed_basic(self, project):
+ results = run_dbt(["seed"])
+ assert len(results) == 1
+
+ relation = relation_from_name(project.adapter, "seed_basic")
+ result = project.run_sql(f"select count(*) from {relation}",
fetch="one")
+ assert result[0] == 3
+
+
+class TestDorisSeedWithTypes:
+ @pytest.fixture(scope="class")
+ def seeds(self):
+ return {"seed_typed.csv": SEED_TYPED_CSV}
+
+ @pytest.fixture(scope="class")
+ def project_config_update(self):
+ return {"seeds": {
+ "+properties": {"replication_num": "1"},
+ "test": {"seed_typed": {"column_types": {
+ "id": "int",
+ "name": "varchar(100)",
+ "score": "bigint",
+ "active": "boolean",
+ }}},
+ }}
+
+ @pytest.fixture(scope="class")
+ def models(self):
+ return {}
+
+ def test_seed_with_types(self, project):
+ results = run_dbt(["seed"])
+ assert len(results) == 1
+
+ relation = relation_from_name(project.adapter, "seed_typed")
+ result = project.run_sql(f"select count(*) from {relation}",
fetch="one")
+ assert result[0] == 2
+
+
+class TestDorisSeedAndModel:
+ """Test that a model can reference a seed."""
+
+ @pytest.fixture(scope="class")
+ def seeds(self):
+ return {"seed_basic.csv": SEED_CSV}
+
+ @pytest.fixture(scope="class")
+ def project_config_update(self):
+ return {"seeds": {"+properties": {"replication_num": "1"}}}
+
+ @pytest.fixture(scope="class")
+ def models(self):
+ return {"model_from_seed.sql": MODEL_FROM_SEED_SQL}
+
+ def test_seed_then_model(self, project):
+ results = run_dbt(["seed"])
+ assert len(results) == 1
+
+ results = run_dbt(["run"])
+ assert len(results) == 1
+
+ relation = relation_from_name(project.adapter, "model_from_seed")
+ result = project.run_sql(f"select count(*) from {relation}",
fetch="one")
+ assert result[0] == 3
+
+ result = project.run_sql(f"select sum(score) from {relation}",
fetch="one")
+ assert result[0] == 600
diff --git a/extension/dbt-doris/test/functional/adapter/test_doris_snapshot.py
b/extension/dbt-doris/test/functional/adapter/test_doris_snapshot.py
new file mode 100644
index 00000000000..6ff207a40dd
--- /dev/null
+++ b/extension/dbt-doris/test/functional/adapter/test_doris_snapshot.py
@@ -0,0 +1,89 @@
+#!/usr/bin/env python
+# encoding: utf-8
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Tests for Doris snapshot (SCD Type 2) using check strategy.
+Doris snapshots require a unique key table for the upsert-based merge.
+"""
+
+import pytest
+from dbt.tests.util import run_dbt, relation_from_name
+
+
+SOURCE_TABLE_SQL = """
+{{ config(
+ materialized='table',
+ unique_key=['id'],
+ distributed_by=['id'],
+ properties={'replication_num': '1'}
+) }}
+
+select 1 as id, 'alice' as name, 100 as score
+union all
+select 2 as id, 'bob' as name, 200 as score
+"""
+
+SNAPSHOT_SQL = """
+{% snapshot snap_users %}
+
+{{
+ config(
+ target_database=target.schema,
+ target_schema=target.schema,
+ unique_key='id',
+ strategy='check',
+ check_cols=['name', 'score'],
+ )
+}}
+
+select * from {{ ref('snap_source') }}
+
+{% endsnapshot %}
+"""
+
+
+class TestDorisSnapshot:
+ @pytest.fixture(scope="class")
+ def models(self):
+ return {"snap_source.sql": SOURCE_TABLE_SQL}
+
+ @pytest.fixture(scope="class")
+ def snapshots(self):
+ return {"snap_users.sql": SNAPSHOT_SQL}
+
+ def test_snapshot(self, project):
+ # Create the source table
+ results = run_dbt(["run"])
+ assert len(results) == 1
+
+ # First snapshot
+ results = run_dbt(["snapshot"])
+ assert len(results) == 1
+
+ relation = relation_from_name(project.adapter, "snap_users")
+ result = project.run_sql(f"select count(*) from {relation}",
fetch="one")
+ assert result[0] == 2
+
+ # All rows should have dbt_valid_to = NULL (current)
+ result = project.run_sql(
+ f"select count(*) from {relation} where dbt_valid_to is null",
+ fetch="one",
+ )
+ assert result[0] == 2
diff --git a/extension/dbt-doris/test/functional/adapter/test_doris_table.py
b/extension/dbt-doris/test/functional/adapter/test_doris_table.py
new file mode 100644
index 00000000000..76d12361d87
--- /dev/null
+++ b/extension/dbt-doris/test/functional/adapter/test_doris_table.py
@@ -0,0 +1,186 @@
+#!/usr/bin/env python
+# encoding: utf-8
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Tests for Doris table materialization with various key types,
+distributed by, properties, and table re-creation (exchange).
+"""
+
+import pytest
+from dbt.tests.util import run_dbt, relation_from_name
+
+
+# -- Model SQL definitions --
+
+TABLE_BASIC_SQL = """
+{{ config(
+ materialized='table',
+ distributed_by=['id'],
+ properties={'replication_num': '1'}
+) }}
+
+select 1 as id, 'alice' as name, 100 as score
+union all
+select 2 as id, 'bob' as name, 200 as score
+union all
+select 3 as id, 'charlie' as name, 300 as score
+"""
+
+TABLE_DUPLICATE_KEY_SQL = """
+{{ config(
+ materialized='table',
+ duplicate_key=['id'],
+ distributed_by=['id'],
+ properties={'replication_num': '1'}
+) }}
+
+select 1 as id, 'alice' as name
+union all
+select 2 as id, 'bob' as name
+"""
+
+TABLE_BUCKETS_AUTO_SQL = """
+{{ config(
+ materialized='table',
+ duplicate_key=['id'],
+ distributed_by=['id'],
+ buckets=3,
+ properties={'replication_num': '1'}
+) }}
+
+select 1 as id, 'alice' as name
+union all
+select 2 as id, 'bob' as name
+"""
+
+TABLE_WITH_COMMENT_SQL = """
+{{ config(
+ materialized='table',
+ distributed_by=['id'],
+ properties={'replication_num': '1'}
+) }}
+
+select 1 as id, 'test' as name
+"""
+
+TABLE_WITH_COMMENT_SCHEMA_YML = """
+version: 2
+models:
+ - name: table_with_comment
+ description: "This is a test table with comments"
+ columns:
+ - name: id
+ description: "The primary key"
+ - name: name
+ description: "The user name"
+"""
+
+
+class TestDorisTableBasic:
+ @pytest.fixture(scope="class")
+ def models(self):
+ return {"table_basic.sql": TABLE_BASIC_SQL}
+
+ def test_table_basic(self, project):
+ results = run_dbt(["run"])
+ assert len(results) == 1
+
+ relation = relation_from_name(project.adapter, "table_basic")
+ result = project.run_sql(f"select count(*) from {relation}",
fetch="one")
+ assert result[0] == 3
+
+ result = project.run_sql(f"select sum(score) from {relation}",
fetch="one")
+ assert result[0] == 600
+
+
+class TestDorisTableDuplicateKey:
+ @pytest.fixture(scope="class")
+ def models(self):
+ return {"table_dup_key.sql": TABLE_DUPLICATE_KEY_SQL}
+
+ def test_duplicate_key(self, project):
+ results = run_dbt(["run"])
+ assert len(results) == 1
+
+ relation = relation_from_name(project.adapter, "table_dup_key")
+ result = project.run_sql(
+ f"show create table {relation}", fetch="one"
+ )
+ assert "DUPLICATE KEY" in result[1]
+
+
+class TestDorisTableBuckets:
+ @pytest.fixture(scope="class")
+ def models(self):
+ return {"table_buckets.sql": TABLE_BUCKETS_AUTO_SQL}
+
+ def test_table_buckets(self, project):
+ results = run_dbt(["run"])
+ assert len(results) == 1
+
+ relation = relation_from_name(project.adapter, "table_buckets")
+ result = project.run_sql(
+ f"show create table {relation}", fetch="one"
+ )
+ assert "BUCKETS 3" in result[1]
+ assert "DUPLICATE KEY" in result[1]
+
+
+class TestDorisTableRerun:
+ """Test that re-running table materialization replaces the table via
exchange."""
+
+ @pytest.fixture(scope="class")
+ def models(self):
+ return {"table_basic.sql": TABLE_BASIC_SQL}
+
+ def test_table_rerun(self, project):
+ # First run
+ results = run_dbt(["run"])
+ assert len(results) == 1
+
+ relation = relation_from_name(project.adapter, "table_basic")
+ result = project.run_sql(f"select count(*) from {relation}",
fetch="one")
+ assert result[0] == 3
+
+ # Second run (should replace via exchange)
+ results = run_dbt(["run"])
+ assert len(results) == 1
+
+ result = project.run_sql(f"select count(*) from {relation}",
fetch="one")
+ assert result[0] == 3
+
+
+class TestDorisTableWithComment:
+ @pytest.fixture(scope="class")
+ def models(self):
+ return {
+ "table_with_comment.sql": TABLE_WITH_COMMENT_SQL,
+ "schema.yml": TABLE_WITH_COMMENT_SCHEMA_YML,
+ }
+
+ def test_table_comment(self, project):
+ results = run_dbt(["run"])
+ assert len(results) == 1
+
+ relation = relation_from_name(project.adapter, "table_with_comment")
+ result = project.run_sql(
+ f"show create table {relation}", fetch="one"
+ )
+ assert "This is a test table with comments" in result[1]
diff --git a/extension/dbt-doris/test/functional/adapter/test_doris_view.py
b/extension/dbt-doris/test/functional/adapter/test_doris_view.py
new file mode 100644
index 00000000000..d91620ad6e3
--- /dev/null
+++ b/extension/dbt-doris/test/functional/adapter/test_doris_view.py
@@ -0,0 +1,113 @@
+#!/usr/bin/env python
+# encoding: utf-8
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Tests for Doris view materialization: create, replace (CREATE OR REPLACE VIEW),
+and view over different sources.
+"""
+
+import pytest
+from dbt.tests.util import run_dbt, relation_from_name
+
+
+SOURCE_TABLE_SQL = """
+{{ config(
+ materialized='table',
+ distributed_by=['id'],
+ properties={'replication_num': '1'}
+) }}
+
+select 1 as id, 'alice' as name, 100 as score
+union all
+select 2 as id, 'bob' as name, 200 as score
+"""
+
+VIEW_BASIC_SQL = """
+{{ config(materialized='view') }}
+
+select id, name, score from {{ ref('source_table') }}
+"""
+
+VIEW_FILTERED_SQL = """
+{{ config(materialized='view') }}
+
+select id, name, score from {{ ref('source_table') }} where score >= 200
+"""
+
+
+class TestDorisViewCreate:
+ @pytest.fixture(scope="class")
+ def models(self):
+ return {
+ "source_table.sql": SOURCE_TABLE_SQL,
+ "view_basic.sql": VIEW_BASIC_SQL,
+ }
+
+ def test_view_create(self, project):
+ results = run_dbt(["run"])
+ assert len(results) == 2
+
+ relation = relation_from_name(project.adapter, "view_basic")
+ result = project.run_sql(f"select count(*) from {relation}",
fetch="one")
+ assert result[0] == 2
+
+
+class TestDorisViewReplace:
+ """Test re-running view uses CREATE OR REPLACE VIEW successfully."""
+
+ @pytest.fixture(scope="class")
+ def models(self):
+ return {
+ "source_table.sql": SOURCE_TABLE_SQL,
+ "view_basic.sql": VIEW_BASIC_SQL,
+ }
+
+ def test_view_replace(self, project):
+ # First run
+ results = run_dbt(["run"])
+ assert len(results) == 2
+
+ # Second run - should succeed via CREATE OR REPLACE VIEW
+ results = run_dbt(["run"])
+ assert len(results) == 2
+
+ relation = relation_from_name(project.adapter, "view_basic")
+ result = project.run_sql(f"select count(*) from {relation}",
fetch="one")
+ assert result[0] == 2
+
+
+class TestDorisViewFiltered:
+ @pytest.fixture(scope="class")
+ def models(self):
+ return {
+ "source_table.sql": SOURCE_TABLE_SQL,
+ "view_filtered.sql": VIEW_FILTERED_SQL,
+ }
+
+ def test_view_filtered(self, project):
+ results = run_dbt(["run"])
+ assert len(results) == 2
+
+ relation = relation_from_name(project.adapter, "view_filtered")
+ result = project.run_sql(f"select count(*) from {relation}",
fetch="one")
+ assert result[0] == 1
+
+ result = project.run_sql(f"select name from {relation}", fetch="one")
+ assert result[0] == "bob"
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]