Thanks for the suggestion, patching is a good idea. I was hoping there's an
option to set the XCom data explicitly without patching & mocking. Just
like it's possible to instantiate a task.
Is there an option to set the task's input XCom in tests without patching?
Thanks,

On Mon, 10 Feb 2020 at 15:09, Kamil Breguła <[email protected]>
wrote:

> mock.unittest
>
> Here is my operator:
> ```
> class CloudDataCatalogCreateEntryOperator(BaseOperator):
>
>     template_fields = (
>         "location",
>         "entry_group",
>         "entry_id",
>         "entry",
>         "project_id",
>         "retry",
>         "timeout",
>         "metadata",
>         "gcp_conn_id",
>     )
>
>     @apply_defaults
>     def __init__(
>         self,
>         location: str,
>         entry_group: str,
>         entry_id: str,
>         entry: Union[Dict, Entry],
>         project_id: str = None,
>         retry: Retry = None,
>         timeout: float = None,
>         metadata: Sequence[Tuple[str, str]] = None,
>         gcp_conn_id: str = "google_cloud_default",
>         *args,
>         **kwargs
>     ) -> None:
>         super().__init__(*args, **kwargs)
>         self.location = location
>         self.entry_group = entry_group
>         self.entry_id = entry_id
>         self.entry = entry
>         self.project_id = project_id
>         self.retry = retry
>         self.timeout = timeout
>         self.metadata = metadata
>         self.gcp_conn_id = gcp_conn_id
>
>     def execute(self, context: Dict):
>         hook = CloudDataCatalogHook(gcp_conn_id=self.gcp_conn_id)
>         try:
>             result = hook.create_entry(
>                 location=self.location,
>                 entry_group=self.entry_group,
>                 entry_id=self.entry_id,
>                 entry=self.entry,
>                 project_id=self.project_id,
>                 retry=self.retry,
>                 timeout=self.timeout,
>                 metadata=self.metadata,
>             )
>         except AlreadyExists:
>             self.log.info(
>                 "Entry already exists. Skipping create operation.",
>             )
>             result = hook.get_entry(
>                 location=self.location,
>                 entry_group=self.entry_group,
>                 entry=self.entry_id,
>                 project_id=self.project_id,
>                 retry=self.retry,
>                 timeout=self.timeout,
>                 metadata=self.metadata,
>             )
>         _, _, entry_id = result.name.rpartition("/")
>         self.log.info("Current entry_id ID: %s", entry_id)
>         context["task_instance"].xcom_push(key="entry_id", value=entry_id)
>         return MessageToDict(result)
>
> Here is my unittest:
>     @mock.patch(
>
> "airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogHook",
>         **{
>             "return_value.create_entry.side_effect":
> AlreadyExists(message="message"),
>             "return_value.get_entry.return_value": TEST_ENTRY,
>         },
>     )
>     def test_assert_valid_hook_call_when_exists(self, mock_hook) -> None:
>         task = CloudDataCatalogCreateEntryOperator(
>             task_id="task_id",
>             location=TEST_LOCATION,
>             entry_group=TEST_ENTRY_GROUP_ID,
>             entry_id=TEST_ENTRY_ID,
>             entry=TEST_ENTRY,
>             project_id=TEST_PROJECT_ID,
>             retry=TEST_RETRY,
>             timeout=TEST_TIMEOUT,
>             metadata=TEST_METADATA,
>             gcp_conn_id=TEST_GCP_CONN_ID,
>         )
>         ti = mock.MagicMock()
>         result = task.execute(context={"task_instance": ti})
>         mock_hook.assert_called_once_with(gcp_conn_id=TEST_GCP_CONN_ID)
>         mock_hook.return_value.create_entry.assert_called_once_with(
>             location=TEST_LOCATION,
>             entry_group=TEST_ENTRY_GROUP_ID,
>             entry_id=TEST_ENTRY_ID,
>             entry=TEST_ENTRY,
>             project_id=TEST_PROJECT_ID,
>             retry=TEST_RETRY,
>             timeout=TEST_TIMEOUT,
>             metadata=TEST_METADATA,
>         )
>         mock_hook.return_value.get_entry.assert_called_once_with(
>             location=TEST_LOCATION,
>             entry_group=TEST_ENTRY_GROUP_ID,
>             entry=TEST_ENTRY_ID,
>             project_id=TEST_PROJECT_ID,
>             retry=TEST_RETRY,
>             timeout=TEST_TIMEOUT,
>             metadata=TEST_METADATA,
>         )
>         ti.xcom_push.assert_called_once_with(key="entry_id",
> value=TEST_ENTRY_ID)
>         self.assertEqual(TEST_ENTRY_DICT, result)
>
>
> On Mon, Feb 10, 2020 at 2:02 PM Lior Harel <[email protected]> wrote:
> >
> > We're testing our operators with something along these lines:
> >
> > with DAG(dag_id='anydag', start_date=datetime.now()) as dag:
> >     operator = MyOperator(task_id='test_task')
> >     ti = TaskInstance(task=operator, execution_date=datetime.now())
> >     context = ti.get_template_context()
> >     operator.execute(context)
> >
> >
> > MyOperator reads XCom data from its context. Is there a way to set the
> XCom data in the test before executing the operator?
> > Is there a simpler way to test the operator without setting up DAG and
> task instance?
> >
> > Thanks,
>

Reply via email to