nazarsh opened a new issue #16590:
URL: https://github.com/apache/airflow/issues/16590


   **Apache Airflow version**: 2.1.0
   
   
   **Kubernetes version (if you are using kubernetes)** (use `kubectl 
version`): N/A
   
   **Environment**:
   
   - **Cloud provider or hardware configuration**: Astronomer-based local setup 
using Docker `quay.io/astronomer/ap-airflow:2.1.0-2-buster-onbuild`
   - **OS** (e.g. from /etc/os-release): `Debian GNU/Linux 10 (buster)`
   - **Kernel** (e.g. `uname -a`): `Linux 7a92d1fd4406 5.10.25-linuxkit #1 SMP 
Tue Mar 23 09:27:39 UTC 2021 x86_64 GNU/Linux`
   - **Install tools**: apache-airflow-providers-snowflake
   - **Others**:
   
   **What happened**:
   
   Having configured Snowflake connection and pointing to GCP Secret Manager 
backend 
`AIRFLOW__SECRETS__BACKEND=airflow.providers.google.cloud.secrets.secret_manager.CloudSecretManagerBackend`
 I am getting a pretty consistent error traced all the way down to gRPC
   
   ```File 
"/usr/local/lib/python3.7/site-packages/google/api_core/grpc_helpers.py", line 
57, in error_remapped_callable
       return callable_(*args, **kwargs)
     File "/usr/local/lib/python3.7/site-packages/grpc/_channel.py", line 946, 
in __call__
       return _end_unary_response_blocking(state, call, False, None)
     File "/usr/local/lib/python3.7/site-packages/grpc/_channel.py", line 849, 
in _end_unary_response_blocking
       raise _InactiveRpcError(state)
   grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated 
with:
        status = StatusCode.UNKNOWN
        details = "Stream removed"
        debug_error_string = 
"{"created":"@1624370913.481874500","description":"Error received from peer 
ipv4:172.xxx.xx.xxx:443","file":"src/core/lib/surface/call.cc","file_line":1067,"grpc_message":"Stream
 removed","grpc_status":2}"
   >
   
   The above exception was the direct cause of the following exception:
   
   Traceback (most recent call last):
     File 
"/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 
1137, in _run_raw_task
       self._prepare_and_execute_task_with_callbacks(context, task)
     File 
"/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 
1311, in _prepare_and_execute_task_with_callbacks
       result = self._execute_task(context, task_copy)
     File 
"/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 
1341, in _execute_task
       result = task_copy.execute(context=context)
     File "/usr/local/lib/python3.7/site-packages/airflow/operators/python.py", 
line 150, in execute
       return_value = self.execute_callable()
     File "/usr/local/lib/python3.7/site-packages/airflow/operators/python.py", 
line 161, in execute_callable
       return self.python_callable(*self.op_args, **self.op_kwargs)
     File "/usr/local/airflow/dags/qe/weekly.py", line 63, in snfk_hook
       df = hook.get_pandas_df(sql)
     File "/usr/local/lib/python3.7/site-packages/airflow/hooks/dbapi.py", line 
116, in get_pandas_df
       with closing(self.get_conn()) as conn:
     File 
"/usr/local/lib/python3.7/site-packages/airflow/providers/snowflake/hooks/snowflake.py",
 line 220, in get_conn
       conn_config = self._get_conn_params()
     File 
"/usr/local/lib/python3.7/site-packages/airflow/providers/snowflake/hooks/snowflake.py",
 line 152, in _get_conn_params
       self.snowflake_conn_id  # type: ignore[attr-defined] # pylint: 
disable=no-member
     File "/usr/local/lib/python3.7/site-packages/airflow/hooks/base.py", line 
67, in get_connection
       conn = Connection.get_connection_from_secrets(conn_id)
     File 
"/usr/local/lib/python3.7/site-packages/airflow/models/connection.py", line 
376, in get_connection_from_secrets
       conn = secrets_backend.get_connection(conn_id=conn_id)
     File 
"/usr/local/lib/python3.7/site-packages/airflow/secrets/base_secrets.py", line 
64, in get_connection
       conn_uri = self.get_conn_uri(conn_id=conn_id)
     File 
"/usr/local/lib/python3.7/site-packages/airflow/providers/google/cloud/secrets/secret_manager.py",
 line 134, in get_conn_uri
       return self._get_secret(self.connections_prefix, conn_id)
     File 
"/usr/local/lib/python3.7/site-packages/airflow/providers/google/cloud/secrets/secret_manager.py",
 line 170, in _get_secret
       return self.client.get_secret(secret_id=secret_id, 
project_id=self.project_id)
     File 
"/usr/local/lib/python3.7/site-packages/airflow/providers/google/cloud/_internal_client/secret_manager_client.py",
 line 86, in get_secret
       response = self.client.access_secret_version(name)
     File 
"/usr/local/lib/python3.7/site-packages/google/cloud/secretmanager_v1/gapic/secret_manager_service_client.py",
 line 968, in access_secret_version
       request, retry=retry, timeout=timeout, metadata=metadata
     File 
"/usr/local/lib/python3.7/site-packages/google/api_core/gapic_v1/method.py", 
line 145, in __call__
       return wrapped_func(*args, **kwargs)
     File "/usr/local/lib/python3.7/site-packages/google/api_core/retry.py", 
line 286, in retry_wrapped_func
       on_error=on_error,
     File "/usr/local/lib/python3.7/site-packages/google/api_core/retry.py", 
line 184, in retry_target
       return target()
     File "/usr/local/lib/python3.7/site-packages/google/api_core/timeout.py", 
line 214, in func_with_timeout
       return func(*args, **kwargs)
     File 
"/usr/local/lib/python3.7/site-packages/google/api_core/grpc_helpers.py", line 
59, in error_remapped_callable
       six.raise_from(exceptions.from_grpc_error(exc), exc)
     File "<string>", line 3, in raise_from
   google.api_core.exceptions.Unknown: None Stream removed
   ```
   
   **What you expected to happen**:
   
   DAG successfully retrieves a configured connection for Snowflake from GCP 
Secret Manager and executes a query returning back a result.
   
   **How to reproduce it**:
   1. Configure Google Cloud Platform as secrets backend
   
`AIRFLOW__SECRETS__BACKEND=airflow.providers.google.cloud.secrets.secret_manager.CloudSecretManagerBackend`
   2. Configure a Snowflake connection (`requirements.txt` has 
`apache-airflow-providers-snowflake`)
   3. Create a DAG which uses SnowflakeHook similar to this:
   
   ```python
   import logging
   
   import airflow
   from airflow import DAG
   from airflow.operators.python_operator import PythonOperator
   from airflow.contrib.hooks.snowflake_hook import SnowflakeHook
   from airflow.contrib.operators.snowflake_operator import SnowflakeOperator
   
   logging.basicConfig(level=logging.INFO)
   logger = logging.getLogger(__name__)
   
   args = {"owner": "Airflow", "start_date": airflow.utils.dates.days_ago(2)}
   
   dag = DAG(
       dag_id="snowflake_automation", default_args=args, schedule_interval=None
   )
   
   snowflake_query = [
       """create table public.test_employee (id number, name string);""",
       """insert into public.test_employee values(1, “Sam”),(2, “Andy”),(3, 
“Gill”);""",
   ]
   
   
   def get_row_count(**context):
       dwh_hook = SnowflakeHook(snowflake_conn_id="snowflake_conn")
       result = dwh_hook.get_first("select count(*) from public.test_employee")
       logging.info("Number of rows in `public.test_employee`  - %s", result[0])
   
   with dag:
       create_insert = SnowflakeOperator(
           task_id="snowfalke_create",
           sql=snowflake_query ,
           snowflake_conn_id="snowflake_conn",
       )
   
       get_count = PythonOperator(task_id="get_count", 
python_callable=get_row_count)
   
   create_insert >> get_count
   ```
   
   **Anything else we need to know**:
   
   I looked around to see if this is an issue with Google's `api-core` and it 
seems like somebody has done research into it to point out that it might be 
downstream implementation issue and not the `api-core` issue: 
https://stackoverflow.com/questions/67374613/why-does-accessing-this-variable-fail-after-it-is-used-in-a-thread
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to