Thanks for the suggestion, patching is a good idea. I was hoping there's an option to set the XCom data explicitly without patching & mocking. Just like it's possible to instantiate a task. Is there an option to set the task's input XCom in tests without patching? Thanks,
On Mon, 10 Feb 2020 at 15:09, Kamil Breguła <[email protected]> wrote: > mock.unittest > > Here is my operator: > ``` > class CloudDataCatalogCreateEntryOperator(BaseOperator): > > template_fields = ( > "location", > "entry_group", > "entry_id", > "entry", > "project_id", > "retry", > "timeout", > "metadata", > "gcp_conn_id", > ) > > @apply_defaults > def __init__( > self, > location: str, > entry_group: str, > entry_id: str, > entry: Union[Dict, Entry], > project_id: str = None, > retry: Retry = None, > timeout: float = None, > metadata: Sequence[Tuple[str, str]] = None, > gcp_conn_id: str = "google_cloud_default", > *args, > **kwargs > ) -> None: > super().__init__(*args, **kwargs) > self.location = location > self.entry_group = entry_group > self.entry_id = entry_id > self.entry = entry > self.project_id = project_id > self.retry = retry > self.timeout = timeout > self.metadata = metadata > self.gcp_conn_id = gcp_conn_id > > def execute(self, context: Dict): > hook = CloudDataCatalogHook(gcp_conn_id=self.gcp_conn_id) > try: > result = hook.create_entry( > location=self.location, > entry_group=self.entry_group, > entry_id=self.entry_id, > entry=self.entry, > project_id=self.project_id, > retry=self.retry, > timeout=self.timeout, > metadata=self.metadata, > ) > except AlreadyExists: > self.log.info( > "Entry already exists. Skipping create operation.", > ) > result = hook.get_entry( > location=self.location, > entry_group=self.entry_group, > entry=self.entry_id, > project_id=self.project_id, > retry=self.retry, > timeout=self.timeout, > metadata=self.metadata, > ) > _, _, entry_id = result.name.rpartition("/") > self.log.info("Current entry_id ID: %s", entry_id) > context["task_instance"].xcom_push(key="entry_id", value=entry_id) > return MessageToDict(result) > > Here is my unittest: > @mock.patch( > > "airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogHook", > **{ > "return_value.create_entry.side_effect": > AlreadyExists(message="message"), > "return_value.get_entry.return_value": TEST_ENTRY, > }, > ) > def test_assert_valid_hook_call_when_exists(self, mock_hook) -> None: > task = CloudDataCatalogCreateEntryOperator( > task_id="task_id", > location=TEST_LOCATION, > entry_group=TEST_ENTRY_GROUP_ID, > entry_id=TEST_ENTRY_ID, > entry=TEST_ENTRY, > project_id=TEST_PROJECT_ID, > retry=TEST_RETRY, > timeout=TEST_TIMEOUT, > metadata=TEST_METADATA, > gcp_conn_id=TEST_GCP_CONN_ID, > ) > ti = mock.MagicMock() > result = task.execute(context={"task_instance": ti}) > mock_hook.assert_called_once_with(gcp_conn_id=TEST_GCP_CONN_ID) > mock_hook.return_value.create_entry.assert_called_once_with( > location=TEST_LOCATION, > entry_group=TEST_ENTRY_GROUP_ID, > entry_id=TEST_ENTRY_ID, > entry=TEST_ENTRY, > project_id=TEST_PROJECT_ID, > retry=TEST_RETRY, > timeout=TEST_TIMEOUT, > metadata=TEST_METADATA, > ) > mock_hook.return_value.get_entry.assert_called_once_with( > location=TEST_LOCATION, > entry_group=TEST_ENTRY_GROUP_ID, > entry=TEST_ENTRY_ID, > project_id=TEST_PROJECT_ID, > retry=TEST_RETRY, > timeout=TEST_TIMEOUT, > metadata=TEST_METADATA, > ) > ti.xcom_push.assert_called_once_with(key="entry_id", > value=TEST_ENTRY_ID) > self.assertEqual(TEST_ENTRY_DICT, result) > > > On Mon, Feb 10, 2020 at 2:02 PM Lior Harel <[email protected]> wrote: > > > > We're testing our operators with something along these lines: > > > > with DAG(dag_id='anydag', start_date=datetime.now()) as dag: > > operator = MyOperator(task_id='test_task') > > ti = TaskInstance(task=operator, execution_date=datetime.now()) > > context = ti.get_template_context() > > operator.execute(context) > > > > > > MyOperator reads XCom data from its context. Is there a way to set the > XCom data in the test before executing the operator? > > Is there a simpler way to test the operator without setting up DAG and > task instance? > > > > Thanks, >
