[GitHub] morgendave closed pull request #4101: [AIRFLOW-3272] Add base grpc hook
morgendave closed pull request #4101: [AIRFLOW-3272] Add base grpc hook URL: https://github.com/apache/incubator-airflow/pull/4101 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/airflow/contrib/hooks/grpc_hook.py b/airflow/contrib/hooks/grpc_hook.py new file mode 100644 index 00..b260847f19 --- /dev/null +++ b/airflow/contrib/hooks/grpc_hook.py @@ -0,0 +1,118 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import grpc +from google import auth as google_auth +from google.auth import jwt as google_auth_jwt +from google.auth.transport import grpc as google_auth_transport_grpc +from google.auth.transport import requests as google_auth_transport_requests + +from airflow.hooks.base_hook import BaseHook +from airflow.exceptions import AirflowConfigException + + +class GrpcHook(BaseHook): +""" +General interaction with gRPC servers. +:param grpc_conn_id: The connection ID to use when fetching connection info. +:type grpc_conn_id: str +:param interceptors: a list of gRPC interceptor objects which would be applied +to the connected gRPC channle. None by default. +:type interceptors: a list of gRPC interceptors based on or extends the four +official gRPC interceptors, eg, UnaryUnaryClientInterceptor, UnaryStreamClientInterceptor, +StreamUnaryClientInterceptor, StreamStreamClientInterceptor. +::param custom_connection_func: The customized connection function to return gRPC channel. +:type custom_connection_func: python callable objects that accept the connection as +its only arg. Could be partial or lambda. +""" + +def __init__(self, grpc_conn_id, interceptors=None, custom_connection_func=None): +self.grpc_conn_id = grpc_conn_id +self.conn = self.get_connection(self.grpc_conn_id) +self.extras = self.conn.extra_dejson +self.interceptors = interceptors if interceptors else [] +self.custom_connection_func = custom_connection_func + +def get_conn(self): +if "://" in self.conn.host: +base_url = self.conn.host +else: +# schema defaults to HTTP +schema = self.conn.schema if self.conn.schema else "http" +base_url = schema + "://" + self.conn.host + +if self.conn.port: +base_url = base_url + ":" + str(self.conn.port) + "/" + +auth_type = self._get_field("auth_type") + +if auth_type == "NO_AUTH": +channel = grpc.insecure_channel(base_url) +elif auth_type == "SSL" or auth_type == "TLS": +credential_file_name = self._get_field("credential_pem_file") +creds = grpc.ssl_channel_credentials(open(credential_file_name).read()) +channel = grpc.secure_channel(base_url, creds) +elif auth_type == "JWT_GOOGLE": +credentials, _ = google_auth.default() +jwt_creds = google_auth_jwt.OnDemandCredentials.from_signing_credentials( +credentials) +channel = google_auth_transport_grpc.secure_authorized_channel( +jwt_creds, None, base_url) +elif auth_type == "OATH_GOOGLE": +scopes = self._get_field("scopes").split(",") +credentials, _ = google_auth.default(scopes=scopes) +request = google_auth_transport_requests.Request() +channel = google_auth_transport_grpc.secure_authorized_channel( +credentials, request, base_url) +elif auth_type == "CUSTOM": +if not self.custom_connection_func: +raise AirflowConfigException( +"Customized connection function not set, not able to establish a channel") +channel = self.custom_connection_func(self.conn) + +if self.interceptors: +for interceptor in self.interceptors: +channel = grpc.intercept_channel(channel, + interceptor) + +return channel + +def run(self, stub_class, call_func, streaming=False, data={}): +with self.get_conn() as channel: +stub = stub_class(channel) +try: +response = stub.call_func(**data) +
[GitHub] morgendave closed pull request #4101: [AIRFLOW-3272] Add base grpc hook
morgendave closed pull request #4101: [AIRFLOW-3272] Add base grpc hook URL: https://github.com/apache/incubator-airflow/pull/4101 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/airflow/contrib/hooks/grpc_hook.py b/airflow/contrib/hooks/grpc_hook.py new file mode 100644 index 00..b260847f19 --- /dev/null +++ b/airflow/contrib/hooks/grpc_hook.py @@ -0,0 +1,118 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import grpc +from google import auth as google_auth +from google.auth import jwt as google_auth_jwt +from google.auth.transport import grpc as google_auth_transport_grpc +from google.auth.transport import requests as google_auth_transport_requests + +from airflow.hooks.base_hook import BaseHook +from airflow.exceptions import AirflowConfigException + + +class GrpcHook(BaseHook): +""" +General interaction with gRPC servers. +:param grpc_conn_id: The connection ID to use when fetching connection info. +:type grpc_conn_id: str +:param interceptors: a list of gRPC interceptor objects which would be applied +to the connected gRPC channle. None by default. +:type interceptors: a list of gRPC interceptors based on or extends the four +official gRPC interceptors, eg, UnaryUnaryClientInterceptor, UnaryStreamClientInterceptor, +StreamUnaryClientInterceptor, StreamStreamClientInterceptor. +::param custom_connection_func: The customized connection function to return gRPC channel. +:type custom_connection_func: python callable objects that accept the connection as +its only arg. Could be partial or lambda. +""" + +def __init__(self, grpc_conn_id, interceptors=None, custom_connection_func=None): +self.grpc_conn_id = grpc_conn_id +self.conn = self.get_connection(self.grpc_conn_id) +self.extras = self.conn.extra_dejson +self.interceptors = interceptors if interceptors else [] +self.custom_connection_func = custom_connection_func + +def get_conn(self): +if "://" in self.conn.host: +base_url = self.conn.host +else: +# schema defaults to HTTP +schema = self.conn.schema if self.conn.schema else "http" +base_url = schema + "://" + self.conn.host + +if self.conn.port: +base_url = base_url + ":" + str(self.conn.port) + "/" + +auth_type = self._get_field("auth_type") + +if auth_type == "NO_AUTH": +channel = grpc.insecure_channel(base_url) +elif auth_type == "SSL" or auth_type == "TLS": +credential_file_name = self._get_field("credential_pem_file") +creds = grpc.ssl_channel_credentials(open(credential_file_name).read()) +channel = grpc.secure_channel(base_url, creds) +elif auth_type == "JWT_GOOGLE": +credentials, _ = google_auth.default() +jwt_creds = google_auth_jwt.OnDemandCredentials.from_signing_credentials( +credentials) +channel = google_auth_transport_grpc.secure_authorized_channel( +jwt_creds, None, base_url) +elif auth_type == "OATH_GOOGLE": +scopes = self._get_field("scopes").split(",") +credentials, _ = google_auth.default(scopes=scopes) +request = google_auth_transport_requests.Request() +channel = google_auth_transport_grpc.secure_authorized_channel( +credentials, request, base_url) +elif auth_type == "CUSTOM": +if not self.custom_connection_func: +raise AirflowConfigException( +"Customized connection function not set, not able to establish a channel") +channel = self.custom_connection_func(self.conn) + +if self.interceptors: +for interceptor in self.interceptors: +channel = grpc.intercept_channel(channel, + interceptor) + +return channel + +def run(self, stub_class, call_func, streaming=False, data={}): +with self.get_conn() as channel: +stub = stub_class(channel) +try: +response = stub.call_func(**data) +
[GitHub] morgendave closed pull request #4101: [AIRFLOW-3272] Add base grpc hook
morgendave closed pull request #4101: [AIRFLOW-3272] Add base grpc hook URL: https://github.com/apache/incubator-airflow/pull/4101 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/airflow/contrib/hooks/grpc_hook.py b/airflow/contrib/hooks/grpc_hook.py new file mode 100644 index 00..b260847f19 --- /dev/null +++ b/airflow/contrib/hooks/grpc_hook.py @@ -0,0 +1,118 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import grpc +from google import auth as google_auth +from google.auth import jwt as google_auth_jwt +from google.auth.transport import grpc as google_auth_transport_grpc +from google.auth.transport import requests as google_auth_transport_requests + +from airflow.hooks.base_hook import BaseHook +from airflow.exceptions import AirflowConfigException + + +class GrpcHook(BaseHook): +""" +General interaction with gRPC servers. +:param grpc_conn_id: The connection ID to use when fetching connection info. +:type grpc_conn_id: str +:param interceptors: a list of gRPC interceptor objects which would be applied +to the connected gRPC channle. None by default. +:type interceptors: a list of gRPC interceptors based on or extends the four +official gRPC interceptors, eg, UnaryUnaryClientInterceptor, UnaryStreamClientInterceptor, +StreamUnaryClientInterceptor, StreamStreamClientInterceptor. +::param custom_connection_func: The customized connection function to return gRPC channel. +:type custom_connection_func: python callable objects that accept the connection as +its only arg. Could be partial or lambda. +""" + +def __init__(self, grpc_conn_id, interceptors=None, custom_connection_func=None): +self.grpc_conn_id = grpc_conn_id +self.conn = self.get_connection(self.grpc_conn_id) +self.extras = self.conn.extra_dejson +self.interceptors = interceptors if interceptors else [] +self.custom_connection_func = custom_connection_func + +def get_conn(self): +if "://" in self.conn.host: +base_url = self.conn.host +else: +# schema defaults to HTTP +schema = self.conn.schema if self.conn.schema else "http" +base_url = schema + "://" + self.conn.host + +if self.conn.port: +base_url = base_url + ":" + str(self.conn.port) + "/" + +auth_type = self._get_field("auth_type") + +if auth_type == "NO_AUTH": +channel = grpc.insecure_channel(base_url) +elif auth_type == "SSL" or auth_type == "TLS": +credential_file_name = self._get_field("credential_pem_file") +creds = grpc.ssl_channel_credentials(open(credential_file_name).read()) +channel = grpc.secure_channel(base_url, creds) +elif auth_type == "JWT_GOOGLE": +credentials, _ = google_auth.default() +jwt_creds = google_auth_jwt.OnDemandCredentials.from_signing_credentials( +credentials) +channel = google_auth_transport_grpc.secure_authorized_channel( +jwt_creds, None, base_url) +elif auth_type == "OATH_GOOGLE": +scopes = self._get_field("scopes").split(",") +credentials, _ = google_auth.default(scopes=scopes) +request = google_auth_transport_requests.Request() +channel = google_auth_transport_grpc.secure_authorized_channel( +credentials, request, base_url) +elif auth_type == "CUSTOM": +if not self.custom_connection_func: +raise AirflowConfigException( +"Customized connection function not set, not able to establish a channel") +channel = self.custom_connection_func(self.conn) + +if self.interceptors: +for interceptor in self.interceptors: +channel = grpc.intercept_channel(channel, + interceptor) + +return channel + +def run(self, stub_class, call_func, streaming=False, data={}): +with self.get_conn() as channel: +stub = stub_class(channel) +try: +response = stub.call_func(**data) +