[GitHub] morgendave closed pull request #4101: [AIRFLOW-3272] Add base grpc hook

2018-11-05 Thread GitBox
morgendave closed pull request #4101: [AIRFLOW-3272] Add base grpc hook
URL: https://github.com/apache/incubator-airflow/pull/4101
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/contrib/hooks/grpc_hook.py 
b/airflow/contrib/hooks/grpc_hook.py
new file mode 100644
index 00..b260847f19
--- /dev/null
+++ b/airflow/contrib/hooks/grpc_hook.py
@@ -0,0 +1,118 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import grpc
+from google import auth as google_auth
+from google.auth import jwt as google_auth_jwt
+from google.auth.transport import grpc as google_auth_transport_grpc
+from google.auth.transport import requests as google_auth_transport_requests
+
+from airflow.hooks.base_hook import BaseHook
+from airflow.exceptions import AirflowConfigException
+
+
+class GrpcHook(BaseHook):
+"""
+General interaction with gRPC servers.
+:param grpc_conn_id: The connection ID to use when fetching connection 
info.
+:type grpc_conn_id: str
+:param interceptors: a list of gRPC interceptor objects which would be 
applied
+to the connected gRPC channle. None by default.
+:type interceptors: a list of gRPC interceptors based on or extends the 
four
+official gRPC interceptors, eg, UnaryUnaryClientInterceptor, 
UnaryStreamClientInterceptor,
+StreamUnaryClientInterceptor, StreamStreamClientInterceptor.
+::param custom_connection_func: The customized connection function to 
return gRPC channel.
+:type custom_connection_func: python callable objects that accept the 
connection as
+its only arg. Could be partial or lambda.
+"""
+
+def __init__(self, grpc_conn_id, interceptors=None, 
custom_connection_func=None):
+self.grpc_conn_id = grpc_conn_id
+self.conn = self.get_connection(self.grpc_conn_id)
+self.extras = self.conn.extra_dejson
+self.interceptors = interceptors if interceptors else []
+self.custom_connection_func = custom_connection_func
+
+def get_conn(self):
+if "://" in self.conn.host:
+base_url = self.conn.host
+else:
+# schema defaults to HTTP
+schema = self.conn.schema if self.conn.schema else "http"
+base_url = schema + "://" + self.conn.host
+
+if self.conn.port:
+base_url = base_url + ":" + str(self.conn.port) + "/"
+
+auth_type = self._get_field("auth_type")
+
+if auth_type == "NO_AUTH":
+channel = grpc.insecure_channel(base_url)
+elif auth_type == "SSL" or auth_type == "TLS":
+credential_file_name = self._get_field("credential_pem_file")
+creds = 
grpc.ssl_channel_credentials(open(credential_file_name).read())
+channel = grpc.secure_channel(base_url, creds)
+elif auth_type == "JWT_GOOGLE":
+credentials, _ = google_auth.default()
+jwt_creds = 
google_auth_jwt.OnDemandCredentials.from_signing_credentials(
+credentials)
+channel = google_auth_transport_grpc.secure_authorized_channel(
+jwt_creds, None, base_url)
+elif auth_type == "OATH_GOOGLE":
+scopes = self._get_field("scopes").split(",")
+credentials, _ = google_auth.default(scopes=scopes)
+request = google_auth_transport_requests.Request()
+channel = google_auth_transport_grpc.secure_authorized_channel(
+credentials, request, base_url)
+elif auth_type == "CUSTOM":
+if not self.custom_connection_func:
+raise AirflowConfigException(
+"Customized connection function not set, not able to 
establish a channel")
+channel = self.custom_connection_func(self.conn)
+
+if self.interceptors:
+for interceptor in self.interceptors:
+channel = grpc.intercept_channel(channel,
+ interceptor)
+
+return channel
+
+def run(self, stub_class, call_func, streaming=False, data={}):
+with self.get_conn() as channel:
+stub = stub_class(channel)
+try:
+response = stub.call_func(**data)
+

[GitHub] morgendave closed pull request #4101: [AIRFLOW-3272] Add base grpc hook

2018-11-05 Thread GitBox
morgendave closed pull request #4101: [AIRFLOW-3272] Add base grpc hook
URL: https://github.com/apache/incubator-airflow/pull/4101
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/contrib/hooks/grpc_hook.py 
b/airflow/contrib/hooks/grpc_hook.py
new file mode 100644
index 00..b260847f19
--- /dev/null
+++ b/airflow/contrib/hooks/grpc_hook.py
@@ -0,0 +1,118 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import grpc
+from google import auth as google_auth
+from google.auth import jwt as google_auth_jwt
+from google.auth.transport import grpc as google_auth_transport_grpc
+from google.auth.transport import requests as google_auth_transport_requests
+
+from airflow.hooks.base_hook import BaseHook
+from airflow.exceptions import AirflowConfigException
+
+
+class GrpcHook(BaseHook):
+"""
+General interaction with gRPC servers.
+:param grpc_conn_id: The connection ID to use when fetching connection 
info.
+:type grpc_conn_id: str
+:param interceptors: a list of gRPC interceptor objects which would be 
applied
+to the connected gRPC channle. None by default.
+:type interceptors: a list of gRPC interceptors based on or extends the 
four
+official gRPC interceptors, eg, UnaryUnaryClientInterceptor, 
UnaryStreamClientInterceptor,
+StreamUnaryClientInterceptor, StreamStreamClientInterceptor.
+::param custom_connection_func: The customized connection function to 
return gRPC channel.
+:type custom_connection_func: python callable objects that accept the 
connection as
+its only arg. Could be partial or lambda.
+"""
+
+def __init__(self, grpc_conn_id, interceptors=None, 
custom_connection_func=None):
+self.grpc_conn_id = grpc_conn_id
+self.conn = self.get_connection(self.grpc_conn_id)
+self.extras = self.conn.extra_dejson
+self.interceptors = interceptors if interceptors else []
+self.custom_connection_func = custom_connection_func
+
+def get_conn(self):
+if "://" in self.conn.host:
+base_url = self.conn.host
+else:
+# schema defaults to HTTP
+schema = self.conn.schema if self.conn.schema else "http"
+base_url = schema + "://" + self.conn.host
+
+if self.conn.port:
+base_url = base_url + ":" + str(self.conn.port) + "/"
+
+auth_type = self._get_field("auth_type")
+
+if auth_type == "NO_AUTH":
+channel = grpc.insecure_channel(base_url)
+elif auth_type == "SSL" or auth_type == "TLS":
+credential_file_name = self._get_field("credential_pem_file")
+creds = 
grpc.ssl_channel_credentials(open(credential_file_name).read())
+channel = grpc.secure_channel(base_url, creds)
+elif auth_type == "JWT_GOOGLE":
+credentials, _ = google_auth.default()
+jwt_creds = 
google_auth_jwt.OnDemandCredentials.from_signing_credentials(
+credentials)
+channel = google_auth_transport_grpc.secure_authorized_channel(
+jwt_creds, None, base_url)
+elif auth_type == "OATH_GOOGLE":
+scopes = self._get_field("scopes").split(",")
+credentials, _ = google_auth.default(scopes=scopes)
+request = google_auth_transport_requests.Request()
+channel = google_auth_transport_grpc.secure_authorized_channel(
+credentials, request, base_url)
+elif auth_type == "CUSTOM":
+if not self.custom_connection_func:
+raise AirflowConfigException(
+"Customized connection function not set, not able to 
establish a channel")
+channel = self.custom_connection_func(self.conn)
+
+if self.interceptors:
+for interceptor in self.interceptors:
+channel = grpc.intercept_channel(channel,
+ interceptor)
+
+return channel
+
+def run(self, stub_class, call_func, streaming=False, data={}):
+with self.get_conn() as channel:
+stub = stub_class(channel)
+try:
+response = stub.call_func(**data)
+

[GitHub] morgendave closed pull request #4101: [AIRFLOW-3272] Add base grpc hook

2018-11-05 Thread GitBox
morgendave closed pull request #4101: [AIRFLOW-3272] Add base grpc hook
URL: https://github.com/apache/incubator-airflow/pull/4101
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/contrib/hooks/grpc_hook.py 
b/airflow/contrib/hooks/grpc_hook.py
new file mode 100644
index 00..b260847f19
--- /dev/null
+++ b/airflow/contrib/hooks/grpc_hook.py
@@ -0,0 +1,118 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import grpc
+from google import auth as google_auth
+from google.auth import jwt as google_auth_jwt
+from google.auth.transport import grpc as google_auth_transport_grpc
+from google.auth.transport import requests as google_auth_transport_requests
+
+from airflow.hooks.base_hook import BaseHook
+from airflow.exceptions import AirflowConfigException
+
+
+class GrpcHook(BaseHook):
+"""
+General interaction with gRPC servers.
+:param grpc_conn_id: The connection ID to use when fetching connection 
info.
+:type grpc_conn_id: str
+:param interceptors: a list of gRPC interceptor objects which would be 
applied
+to the connected gRPC channle. None by default.
+:type interceptors: a list of gRPC interceptors based on or extends the 
four
+official gRPC interceptors, eg, UnaryUnaryClientInterceptor, 
UnaryStreamClientInterceptor,
+StreamUnaryClientInterceptor, StreamStreamClientInterceptor.
+::param custom_connection_func: The customized connection function to 
return gRPC channel.
+:type custom_connection_func: python callable objects that accept the 
connection as
+its only arg. Could be partial or lambda.
+"""
+
+def __init__(self, grpc_conn_id, interceptors=None, 
custom_connection_func=None):
+self.grpc_conn_id = grpc_conn_id
+self.conn = self.get_connection(self.grpc_conn_id)
+self.extras = self.conn.extra_dejson
+self.interceptors = interceptors if interceptors else []
+self.custom_connection_func = custom_connection_func
+
+def get_conn(self):
+if "://" in self.conn.host:
+base_url = self.conn.host
+else:
+# schema defaults to HTTP
+schema = self.conn.schema if self.conn.schema else "http"
+base_url = schema + "://" + self.conn.host
+
+if self.conn.port:
+base_url = base_url + ":" + str(self.conn.port) + "/"
+
+auth_type = self._get_field("auth_type")
+
+if auth_type == "NO_AUTH":
+channel = grpc.insecure_channel(base_url)
+elif auth_type == "SSL" or auth_type == "TLS":
+credential_file_name = self._get_field("credential_pem_file")
+creds = 
grpc.ssl_channel_credentials(open(credential_file_name).read())
+channel = grpc.secure_channel(base_url, creds)
+elif auth_type == "JWT_GOOGLE":
+credentials, _ = google_auth.default()
+jwt_creds = 
google_auth_jwt.OnDemandCredentials.from_signing_credentials(
+credentials)
+channel = google_auth_transport_grpc.secure_authorized_channel(
+jwt_creds, None, base_url)
+elif auth_type == "OATH_GOOGLE":
+scopes = self._get_field("scopes").split(",")
+credentials, _ = google_auth.default(scopes=scopes)
+request = google_auth_transport_requests.Request()
+channel = google_auth_transport_grpc.secure_authorized_channel(
+credentials, request, base_url)
+elif auth_type == "CUSTOM":
+if not self.custom_connection_func:
+raise AirflowConfigException(
+"Customized connection function not set, not able to 
establish a channel")
+channel = self.custom_connection_func(self.conn)
+
+if self.interceptors:
+for interceptor in self.interceptors:
+channel = grpc.intercept_channel(channel,
+ interceptor)
+
+return channel
+
+def run(self, stub_class, call_func, streaming=False, data={}):
+with self.get_conn() as channel:
+stub = stub_class(channel)
+try:
+response = stub.call_func(**data)
+