nazarsh opened a new issue #16590:
URL: https://github.com/apache/airflow/issues/16590
**Apache Airflow version**: 2.1.0
**Kubernetes version (if you are using kubernetes)** (use `kubectl
version`): N/A
**Environment**:
- **Cloud provider or hardware configuration**: Astronomer-based local setup
using Docker `quay.io/astronomer/ap-airflow:2.1.0-2-buster-onbuild`
- **OS** (e.g. from /etc/os-release): `Debian GNU/Linux 10 (buster)`
- **Kernel** (e.g. `uname -a`): `Linux 7a92d1fd4406 5.10.25-linuxkit #1 SMP
Tue Mar 23 09:27:39 UTC 2021 x86_64 GNU/Linux`
- **Install tools**: apache-airflow-providers-snowflake
- **Others**:
**What happened**:
Having configured Snowflake connection and pointing to GCP Secret Manager
backend
`AIRFLOW__SECRETS__BACKEND=airflow.providers.google.cloud.secrets.secret_manager.CloudSecretManagerBackend`
I am getting a pretty consistent error traced all the way down to gRPC
```File
"/usr/local/lib/python3.7/site-packages/google/api_core/grpc_helpers.py", line
57, in error_remapped_callable
return callable_(*args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/grpc/_channel.py", line 946,
in __call__
return _end_unary_response_blocking(state, call, False, None)
File "/usr/local/lib/python3.7/site-packages/grpc/_channel.py", line 849,
in _end_unary_response_blocking
raise _InactiveRpcError(state)
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated
with:
status = StatusCode.UNKNOWN
details = "Stream removed"
debug_error_string =
"{"created":"@1624370913.481874500","description":"Error received from peer
ipv4:172.xxx.xx.xxx:443","file":"src/core/lib/surface/call.cc","file_line":1067,"grpc_message":"Stream
removed","grpc_status":2}"
>
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File
"/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line
1137, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File
"/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line
1311, in _prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File
"/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line
1341, in _execute_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python3.7/site-packages/airflow/operators/python.py",
line 150, in execute
return_value = self.execute_callable()
File "/usr/local/lib/python3.7/site-packages/airflow/operators/python.py",
line 161, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
File "/usr/local/airflow/dags/qe/weekly.py", line 63, in snfk_hook
df = hook.get_pandas_df(sql)
File "/usr/local/lib/python3.7/site-packages/airflow/hooks/dbapi.py", line
116, in get_pandas_df
with closing(self.get_conn()) as conn:
File
"/usr/local/lib/python3.7/site-packages/airflow/providers/snowflake/hooks/snowflake.py",
line 220, in get_conn
conn_config = self._get_conn_params()
File
"/usr/local/lib/python3.7/site-packages/airflow/providers/snowflake/hooks/snowflake.py",
line 152, in _get_conn_params
self.snowflake_conn_id # type: ignore[attr-defined] # pylint:
disable=no-member
File "/usr/local/lib/python3.7/site-packages/airflow/hooks/base.py", line
67, in get_connection
conn = Connection.get_connection_from_secrets(conn_id)
File
"/usr/local/lib/python3.7/site-packages/airflow/models/connection.py", line
376, in get_connection_from_secrets
conn = secrets_backend.get_connection(conn_id=conn_id)
File
"/usr/local/lib/python3.7/site-packages/airflow/secrets/base_secrets.py", line
64, in get_connection
conn_uri = self.get_conn_uri(conn_id=conn_id)
File
"/usr/local/lib/python3.7/site-packages/airflow/providers/google/cloud/secrets/secret_manager.py",
line 134, in get_conn_uri
return self._get_secret(self.connections_prefix, conn_id)
File
"/usr/local/lib/python3.7/site-packages/airflow/providers/google/cloud/secrets/secret_manager.py",
line 170, in _get_secret
return self.client.get_secret(secret_id=secret_id,
project_id=self.project_id)
File
"/usr/local/lib/python3.7/site-packages/airflow/providers/google/cloud/_internal_client/secret_manager_client.py",
line 86, in get_secret
response = self.client.access_secret_version(name)
File
"/usr/local/lib/python3.7/site-packages/google/cloud/secretmanager_v1/gapic/secret_manager_service_client.py",
line 968, in access_secret_version
request, retry=retry, timeout=timeout, metadata=metadata
File
"/usr/local/lib/python3.7/site-packages/google/api_core/gapic_v1/method.py",
line 145, in __call__
return wrapped_func(*args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/google/api_core/retry.py",
line 286, in retry_wrapped_func
on_error=on_error,
File "/usr/local/lib/python3.7/site-packages/google/api_core/retry.py",
line 184, in retry_target
return target()
File "/usr/local/lib/python3.7/site-packages/google/api_core/timeout.py",
line 214, in func_with_timeout
return func(*args, **kwargs)
File
"/usr/local/lib/python3.7/site-packages/google/api_core/grpc_helpers.py", line
59, in error_remapped_callable
six.raise_from(exceptions.from_grpc_error(exc), exc)
File "<string>", line 3, in raise_from
google.api_core.exceptions.Unknown: None Stream removed
```
**What you expected to happen**:
DAG successfully retrieves a configured connection for Snowflake from GCP
Secret Manager and executes a query returning back a result.
**How to reproduce it**:
1. Configure Google Cloud Platform as secrets backend
`AIRFLOW__SECRETS__BACKEND=airflow.providers.google.cloud.secrets.secret_manager.CloudSecretManagerBackend`
2. Configure a Snowflake connection (`requirements.txt` has
`apache-airflow-providers-snowflake`)
3. Create a DAG which uses SnowflakeHook similar to this:
```python
import logging
import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.contrib.hooks.snowflake_hook import SnowflakeHook
from airflow.contrib.operators.snowflake_operator import SnowflakeOperator
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
args = {"owner": "Airflow", "start_date": airflow.utils.dates.days_ago(2)}
dag = DAG(
dag_id="snowflake_automation", default_args=args, schedule_interval=None
)
snowflake_query = [
"""create table public.test_employee (id number, name string);""",
"""insert into public.test_employee values(1, “Sam”),(2, “Andy”),(3,
“Gill”);""",
]
def get_row_count(**context):
dwh_hook = SnowflakeHook(snowflake_conn_id="snowflake_conn")
result = dwh_hook.get_first("select count(*) from public.test_employee")
logging.info("Number of rows in `public.test_employee` - %s", result[0])
with dag:
create_insert = SnowflakeOperator(
task_id="snowfalke_create",
sql=snowflake_query ,
snowflake_conn_id="snowflake_conn",
)
get_count = PythonOperator(task_id="get_count",
python_callable=get_row_count)
create_insert >> get_count
```
**Anything else we need to know**:
I looked around to see if this is an issue with Google's `api-core` and it
seems like somebody has done research into it to point out that it might be
downstream implementation issue and not the `api-core` issue:
https://stackoverflow.com/questions/67374613/why-does-accessing-this-variable-fail-after-it-is-used-in-a-thread
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]