mock.unittest
Here is my operator:
```
class CloudDataCatalogCreateEntryOperator(BaseOperator):
template_fields = (
"location",
"entry_group",
"entry_id",
"entry",
"project_id",
"retry",
"timeout",
"metadata",
"gcp_conn_id",
)
@apply_defaults
def __init__(
self,
location: str,
entry_group: str,
entry_id: str,
entry: Union[Dict, Entry],
project_id: str = None,
retry: Retry = None,
timeout: float = None,
metadata: Sequence[Tuple[str, str]] = None,
gcp_conn_id: str = "google_cloud_default",
*args,
**kwargs
) -> None:
super().__init__(*args, **kwargs)
self.location = location
self.entry_group = entry_group
self.entry_id = entry_id
self.entry = entry
self.project_id = project_id
self.retry = retry
self.timeout = timeout
self.metadata = metadata
self.gcp_conn_id = gcp_conn_id
def execute(self, context: Dict):
hook = CloudDataCatalogHook(gcp_conn_id=self.gcp_conn_id)
try:
result = hook.create_entry(
location=self.location,
entry_group=self.entry_group,
entry_id=self.entry_id,
entry=self.entry,
project_id=self.project_id,
retry=self.retry,
timeout=self.timeout,
metadata=self.metadata,
)
except AlreadyExists:
self.log.info(
"Entry already exists. Skipping create operation.",
)
result = hook.get_entry(
location=self.location,
entry_group=self.entry_group,
entry=self.entry_id,
project_id=self.project_id,
retry=self.retry,
timeout=self.timeout,
metadata=self.metadata,
)
_, _, entry_id = result.name.rpartition("/")
self.log.info("Current entry_id ID: %s", entry_id)
context["task_instance"].xcom_push(key="entry_id", value=entry_id)
return MessageToDict(result)
Here is my unittest:
@mock.patch(
"airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogHook",
**{
"return_value.create_entry.side_effect":
AlreadyExists(message="message"),
"return_value.get_entry.return_value": TEST_ENTRY,
},
)
def test_assert_valid_hook_call_when_exists(self, mock_hook) -> None:
task = CloudDataCatalogCreateEntryOperator(
task_id="task_id",
location=TEST_LOCATION,
entry_group=TEST_ENTRY_GROUP_ID,
entry_id=TEST_ENTRY_ID,
entry=TEST_ENTRY,
project_id=TEST_PROJECT_ID,
retry=TEST_RETRY,
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
gcp_conn_id=TEST_GCP_CONN_ID,
)
ti = mock.MagicMock()
result = task.execute(context={"task_instance": ti})
mock_hook.assert_called_once_with(gcp_conn_id=TEST_GCP_CONN_ID)
mock_hook.return_value.create_entry.assert_called_once_with(
location=TEST_LOCATION,
entry_group=TEST_ENTRY_GROUP_ID,
entry_id=TEST_ENTRY_ID,
entry=TEST_ENTRY,
project_id=TEST_PROJECT_ID,
retry=TEST_RETRY,
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
)
mock_hook.return_value.get_entry.assert_called_once_with(
location=TEST_LOCATION,
entry_group=TEST_ENTRY_GROUP_ID,
entry=TEST_ENTRY_ID,
project_id=TEST_PROJECT_ID,
retry=TEST_RETRY,
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
)
ti.xcom_push.assert_called_once_with(key="entry_id",
value=TEST_ENTRY_ID)
self.assertEqual(TEST_ENTRY_DICT, result)
On Mon, Feb 10, 2020 at 2:02 PM Lior Harel <[email protected]> wrote:
>
> We're testing our operators with something along these lines:
>
> with DAG(dag_id='anydag', start_date=datetime.now()) as dag:
> operator = MyOperator(task_id='test_task')
> ti = TaskInstance(task=operator, execution_date=datetime.now())
> context = ti.get_template_context()
> operator.execute(context)
>
>
> MyOperator reads XCom data from its context. Is there a way to set the XCom
> data in the test before executing the operator?
> Is there a simpler way to test the operator without setting up DAG and task
> instance?
>
> Thanks,