Repository: incubator-airflow
Updated Branches:
  refs/heads/master a5f51cc4a -> cc01b7df3


[AIRFLOW-1571] Add AWS Lambda Hook

Closes #2718 from sid88in/feature/aws_lambda_hook2


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/cc01b7df
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/cc01b7df
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/cc01b7df

Branch: refs/heads/master
Commit: cc01b7df3d3d5d2784b5e72aa282e957c9cf63ad
Parents: a5f51cc
Author: sid.gupta <sid.gu...@glassdoor.com>
Authored: Thu Nov 2 12:53:39 2017 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Thu Nov 2 12:53:39 2017 +0100

----------------------------------------------------------------------
 airflow/contrib/hooks/aws_lambda_hook.py    | 62 ++++++++++++++++++
 tests/contrib/hooks/test_aws_lambda_hook.py | 80 ++++++++++++++++++++++++
 2 files changed, 142 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/cc01b7df/airflow/contrib/hooks/aws_lambda_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/aws_lambda_hook.py 
b/airflow/contrib/hooks/aws_lambda_hook.py
new file mode 100644
index 0000000..bcd1c7f
--- /dev/null
+++ b/airflow/contrib/hooks/aws_lambda_hook.py
@@ -0,0 +1,62 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from airflow.contrib.hooks.aws_hook import AwsHook
+
+
+class AwsLambdaHook(AwsHook):
+    """
+    Interact with AWS Lambda
+
+    :param function_name: AWS Lambda Function Name
+    :type function_name: str
+    :param region_name: AWS Region Name (example: us-west-2)
+    :type region_name: str
+    :param log_type: Tail Invocation Request
+    :type log_type: str
+    :param qualifier: AWS Lambda Function Version or Alias Name
+    :type qualifier: str
+    :param invocation_type: AWS Lambda Invocation Type (RequestResponse, Event 
etc)
+    :type invocation_type: str
+    """
+
+    def __init__(self, function_name, region_name=None, log_type='None', 
qualifier='$LATEST',
+                 invocation_type='RequestResponse', *args, **kwargs):
+        self.function_name = function_name
+        self.region_name = region_name
+        self.log_type = log_type
+        self.invocation_type = invocation_type
+        self.qualifier = qualifier
+        super(AwsLambdaHook, self).__init__(*args, **kwargs)
+
+    def get_conn(self):
+        self.conn = self.get_client_type('lambda', self.region_name)
+        return self.conn
+
+    def invoke_lambda(self, payload):
+        """
+        Invoke Lambda Function
+        """
+
+        awslambda_conn = self.get_conn()
+
+        response = awslambda_conn.invoke(
+            FunctionName=self.function_name,
+            InvocationType=self.invocation_type,
+            LogType=self.log_type,
+            Payload=payload,
+            Qualifier=self.qualifier
+        )
+
+        return response

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/cc01b7df/tests/contrib/hooks/test_aws_lambda_hook.py
----------------------------------------------------------------------
diff --git a/tests/contrib/hooks/test_aws_lambda_hook.py 
b/tests/contrib/hooks/test_aws_lambda_hook.py
new file mode 100644
index 0000000..eb11125
--- /dev/null
+++ b/tests/contrib/hooks/test_aws_lambda_hook.py
@@ -0,0 +1,80 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import unittest
+import io
+import json
+import textwrap
+import zipfile
+import base64
+
+from airflow.contrib.hooks.aws_lambda_hook import AwsLambdaHook
+
+try:
+    from moto import mock_lambda
+except ImportError:
+    mock_lambda = None
+
+
+class TestAwsLambdaHook(unittest.TestCase):
+
+    @unittest.skipIf(mock_lambda is None, 'mock_lambda package not present')
+    @mock_lambda
+    def test_get_conn_returns_a_boto3_connection(self):
+        hook = AwsLambdaHook(aws_conn_id='aws_default',
+                             function_name="test_function", 
region_name="us-east-1")
+        self.assertIsNotNone(hook.get_conn())
+
+    def lambda_function(self):
+        code = textwrap.dedent("""
+def lambda_handler(event, context):
+    return event
+        """)
+        zip_output = io.BytesIO()
+        zip_file = zipfile.ZipFile(zip_output, 'w', zipfile.ZIP_DEFLATED)
+        zip_file.writestr('lambda_function.zip', code)
+        zip_file.close()
+        zip_output.seek(0)
+        return zip_output.read()
+
+    @unittest.skipIf(mock_lambda is None, 'mock_lambda package not present')
+    @mock_lambda
+    def test_invoke_lambda_function(self):
+
+        hook = AwsLambdaHook(aws_conn_id='aws_default',
+                             function_name="test_function", 
region_name="us-east-1")
+
+        hook.get_conn().create_function(
+            FunctionName='test_function',
+            Runtime='python2.7',
+            Role='test-iam-role',
+            Handler='lambda_function.lambda_handler',
+            Code={
+                'ZipFile': self.lambda_function(),
+            },
+            Description='test lambda function',
+            Timeout=3,
+            MemorySize=128,
+            Publish=True,
+        )
+
+        input = {'hello': 'airflow'}
+        response = hook.invoke_lambda(payload=json.dumps(input))
+
+        self.assertEquals(response["StatusCode"], 202)
+
+
+if __name__ == '__main__':
+    unittest.main()

Reply via email to