[GitHub] [airflow] feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration

2020-05-10 Thread GitBox


feluelle commented on a change in pull request #6007:
URL: https://github.com/apache/airflow/pull/6007#discussion_r422639912



##
File path: airflow/providers/amazon/aws/hooks/glue.py
##
@@ -0,0 +1,180 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import time
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+
+
+class AwsGlueJobHook(AwsBaseHook):
+"""
+Interact with AWS Glue - create job, trigger, crawler
+
+:param job_name: unique job name per AWS account
+:type job_name: Optional[str]
+:param desc: job description
+:type desc: Optional[str]
+:param concurrent_run_limit: The maximum number of concurrent runs allowed 
for a job
+:type concurrent_run_limit: int
+:param script_location: path to etl script on s3
+:type script_location: Optional[str]
+:param retry_limit: Maximum number of times to retry this job if it fails
+:type retry_limit: int
+:param num_of_dpus: Number of AWS Glue DPUs to allocate to this Job
+:type num_of_dpus: int
+:param region_name: aws region name (example: us-east-1)
+:type region_name: Optional[str]
+:param iam_role_name: AWS IAM Role for Glue Job
+:type iam_role_name: Optional[str]
+:param s3_bucket: S3 bucket where logs and local etl script will be 
uploaded
+:type s3_bucket: Optional[str]
+"""
+JOB_POLL_INTERVAL = 6  # polls job status after every JOB_POLL_INTERVAL 
seconds
+
+def __init__(self,
+ s3_bucket=None,
+ job_name=None,
+ desc=None,
+ concurrent_run_limit=1,
+ script_location=None,
+ retry_limit=0,
+ num_of_dpus=10,
+ region_name=None,
+ iam_role_name=None,
+ *args, **kwargs):
+self.job_name = job_name
+self.desc = desc
+self.concurrent_run_limit = concurrent_run_limit
+self.script_location = script_location
+self.retry_limit = retry_limit
+self.num_of_dpus = num_of_dpus
+self.region_name = region_name
+self.s3_bucket = s3_bucket
+self.role_name = iam_role_name
+self.s3_glue_logs = 'logs/glue-logs/'
+super(AwsGlueJobHook, self).__init__(client_type='glue', *args, 
**kwargs)
+
+def get_conn(self):
+"""
+:return: connection
+"""
+conn = self.get_client_type('glue', self.region_name)
+return conn

Review comment:
   You don't need this anymore due to 
https://github.com/apache/airflow/pull/7541

##
File path: tests/providers/amazon/aws/hooks/test_aws_glue_job_hook.py
##
@@ -0,0 +1,107 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import json
+import unittest
+
+import mock
+
+from airflow.providers.amazon.aws.hooks.glue import AwsGlueJobHook
+
+try:
+from moto import mock_iam
+except ImportError:
+mock_iam = None
+
+
+class TestGlueJobHook(unittest.TestCase):
+def setUp(self):
+self.some_aws_region = "us-west-2"
+
+@mock.patch.object(AwsGlueJobHook, 'get_conn')
+def test_get_conn_returns_a_boto3_connection(self, mock_get_conn):
+hook = AwsGlueJobHook(job_name='aws_test_glue_job', 
s3_bucket='some_bucket')
+self.assertIsNotNone(hook.get_conn())

Review comment:
   This test can then also be removed!





[GitHub] [airflow] feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration

2020-04-14 Thread GitBox
feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS 
Glue Job Integration
URL: https://github.com/apache/airflow/pull/6007#discussion_r407932537
 
 

 ##
 File path: airflow/providers/amazon/aws/operators/glue.py
 ##
 @@ -0,0 +1,117 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import unicode_literals
+
+from airflow.providers.amazon.aws.hooks.glue import AwsGlueJobHook
+from airflow.providers.amazon.aws.hooks.s3 import S3Hook
+from airflow.models import BaseOperator
+from airflow.utils.decorators import apply_defaults
+import os.path
+
+
+class AWSGlueJobOperator(BaseOperator):
+"""
+Creates an AWS Glue Job. AWS Glue is a serverless Spark
+ETL service for running Spark Jobs on the AWS cloud.
+Language support: Python and Scala
+
+:param job_name: unique job name per AWS Account
+:type job_name: Optional[str]
+:param script_location: location of ETL script. Must be a local or S3 path
+:type script_location: Optional[str]
+:param job_desc: job description details
+:type job_desc: Optional[str]
+:param concurrent_run_limit: The maximum number of concurrent runs allowed 
for a job
+:type concurrent_run_limit: Optional[int]
+:param script_args: etl script arguments and AWS Glue arguments
+:type script_args: dict
+:param connections: AWS Glue connections to be used by the job.
+:type connections: list
+:param retry_limit: The maximum number of times to retry this job if it 
fails
+:type retry_limit:Optional[int]
+:param num_of_dpus: Number of AWS Glue DPUs to allocate to this Job.
+:type num_of_dpus: int
+:param region_name: aws region name (example: us-east-1)
+:type region_name: str
+:param s3_bucket: S3 bucket where logs and local etl script will be 
uploaded
+:type s3_bucket: Optional[str]
+:param iam_role_name: AWS IAM Role for Glue Job Execution
+:type iam_role_name: Optional[str]
+"""
+template_fields = ()
+template_ext = ()
+ui_color = '#ededed'
+
+@apply_defaults
+def __init__(self,
+ job_name='aws_glue_default_job',
+ job_desc='AWS Glue Job with Airflow',
+ script_location=None,
+ concurrent_run_limit=None,
+ script_args=None,
+ connections=None,
+ retry_limit=None,
+ num_of_dpus=6,
+ aws_conn_id='aws_default',
+ region_name=None,
+ s3_bucket=None,
+ iam_role_name=None,
+ *args, **kwargs
+ ):
+super(AWSGlueJobOperator, self).__init__(*args, **kwargs)
+self.job_name = job_name
+self.job_desc = job_desc
+self.script_location = script_location
+self.concurrent_run_limit = concurrent_run_limit
+self.script_args = script_args or {}
+self.connections = connections or []
+self.retry_limit = retry_limit
+self.num_of_dpus = num_of_dpus
+self.aws_conn_id = aws_conn_id,
+self.region_name = region_name
+self.s3_bucket = s3_bucket
+self.iam_role_name = iam_role_name
+self.S3_PROTOCOL = "s3://"
+self.S3_ARTIFACTS_PREFIX = 'artifacts/glue-scripts/'
+
+def execute(self, context):
+"""
+Executes AWS Glue Job from Airflow
+
+:return: the id of the current glue job.
+"""
+if not self.script_location.startsWith(self.S3_PROTOCOL):
 
 Review comment:
   k sounds good to me. But then we should change the description of the 
operator. WDYT? @abdulbasitds


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration

2020-04-11 Thread GitBox
feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS 
Glue Job Integration
URL: https://github.com/apache/airflow/pull/6007#discussion_r407034889
 
 

 ##
 File path: airflow/providers/amazon/aws/operators/glue.py
 ##
 @@ -0,0 +1,117 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import unicode_literals
+
+from airflow.providers.amazon.aws.hooks.glue import AwsGlueJobHook
+from airflow.providers.amazon.aws.hooks.s3 import S3Hook
+from airflow.models import BaseOperator
+from airflow.utils.decorators import apply_defaults
+import os.path
+
+
+class AWSGlueJobOperator(BaseOperator):
+"""
+Creates an AWS Glue Job. AWS Glue is a serverless Spark
+ETL service for running Spark Jobs on the AWS cloud.
+Language support: Python and Scala
+
+:param job_name: unique job name per AWS Account
+:type job_name: Optional[str]
+:param script_location: location of ETL script. Must be a local or S3 path
+:type script_location: Optional[str]
+:param job_desc: job description details
+:type job_desc: Optional[str]
+:param concurrent_run_limit: The maximum number of concurrent runs allowed 
for a job
+:type concurrent_run_limit: Optional[int]
+:param script_args: etl script arguments and AWS Glue arguments
+:type script_args: dict
+:param connections: AWS Glue connections to be used by the job.
+:type connections: list
+:param retry_limit: The maximum number of times to retry this job if it 
fails
+:type retry_limit:Optional[int]
+:param num_of_dpus: Number of AWS Glue DPUs to allocate to this Job.
+:type num_of_dpus: int
+:param region_name: aws region name (example: us-east-1)
+:type region_name: str
+:param s3_bucket: S3 bucket where logs and local etl script will be 
uploaded
+:type s3_bucket: Optional[str]
+:param iam_role_name: AWS IAM Role for Glue Job Execution
+:type iam_role_name: Optional[str]
+"""
+template_fields = ()
+template_ext = ()
+ui_color = '#ededed'
+
+@apply_defaults
+def __init__(self,
+ job_name='aws_glue_default_job',
+ job_desc='AWS Glue Job with Airflow',
+ script_location=None,
+ concurrent_run_limit=None,
+ script_args=None,
+ connections=None,
+ retry_limit=None,
+ num_of_dpus=6,
+ aws_conn_id='aws_default',
+ region_name=None,
+ s3_bucket=None,
+ iam_role_name=None,
+ *args, **kwargs
+ ):
+super(AWSGlueJobOperator, self).__init__(*args, **kwargs)
+self.job_name = job_name
+self.job_desc = job_desc
+self.script_location = script_location
+self.concurrent_run_limit = concurrent_run_limit
+self.script_args = script_args or {}
+self.connections = connections or []
+self.retry_limit = retry_limit
+self.num_of_dpus = num_of_dpus
+self.aws_conn_id = aws_conn_id,
+self.region_name = region_name
+self.s3_bucket = s3_bucket
+self.iam_role_name = iam_role_name
+self.S3_PROTOCOL = "s3://"
+self.S3_ARTIFACTS_PREFIX = 'artifacts/glue-scripts/'
+
+def execute(self, context):
+"""
+Executes AWS Glue Job from Airflow
+
+:return: the id of the current glue job.
+"""
+if not self.script_location.startsWith(self.S3_PROTOCOL):
 
 Review comment:
   `self.script_location` should be an `arg` not a `kwarg`. It is required to 
pass a script, isn't it? So we should not default it to `None`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration

2020-04-02 Thread GitBox
feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS 
Glue Job Integration
URL: https://github.com/apache/airflow/pull/6007#discussion_r402445986
 
 

 ##
 File path: airflow/providers/amazon/aws/hooks/glue.py
 ##
 @@ -0,0 +1,172 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+import time
+
+
+class AwsGlueJobHook(AwsBaseHook):
+"""
+Interact with AWS Glue - create job, trigger, crawler
+
+:param job_name: unique job name per AWS account
+:type job_name: Optional[str]
+:param desc: job description
+:type desc: Optional[str]
+:param concurrent_run_limit: The maximum number of concurrent runs allowed 
for a job
+:type concurrent_run_limit: int
+:param script_location: path to etl script either on s3 or local
 
 Review comment:
   The hook does not. The operator does.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration

2020-04-02 Thread GitBox
feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS 
Glue Job Integration
URL: https://github.com/apache/airflow/pull/6007#discussion_r402401499
 
 

 ##
 File path: airflow/providers/amazon/aws/operators/glue.py
 ##
 @@ -0,0 +1,117 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import unicode_literals
+
+from airflow.providers.amazon.aws.hooks.glue import AwsGlueJobHook
+from airflow.providers.amazon.aws.hooks.s3 import S3Hook
+from airflow.models import BaseOperator
+from airflow.utils.decorators import apply_defaults
+import os.path
+
+
+class AWSGlueJobOperator(BaseOperator):
+"""
+Creates an AWS Glue Job. AWS Glue is a serverless Spark
+ETL service for running Spark Jobs on the AWS cloud.
+Language support: Python and Scala
+
+:param job_name: unique job name per AWS Account
+:type job_name: Optional[str]
+:param script_location: location of ETL script. Must be a local or S3 path
+:type script_location: Optional[str]
+:param job_desc: job description details
+:type job_desc: Optional[str]
+:param concurrent_run_limit: The maximum number of concurrent runs allowed 
for a job
+:type concurrent_run_limit: Optional[int]
+:param script_args: etl script arguments and AWS Glue arguments
+:type script_args: dict
+:param connections: AWS Glue connections to be used by the job.
+:type connections: list
 
 Review comment:
   This argument is not used, is it? What is this supposed to do?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration

2020-04-02 Thread GitBox
feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS 
Glue Job Integration
URL: https://github.com/apache/airflow/pull/6007#discussion_r402396517
 
 

 ##
 File path: airflow/providers/amazon/aws/hooks/glue.py
 ##
 @@ -0,0 +1,172 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+import time
+
+
+class AwsGlueJobHook(AwsBaseHook):
+"""
+Interact with AWS Glue - create job, trigger, crawler
+
+:param job_name: unique job name per AWS account
+:type job_name: Optional[str]
+:param desc: job description
+:type desc: Optional[str]
+:param concurrent_run_limit: The maximum number of concurrent runs allowed 
for a job
+:type concurrent_run_limit: int
+:param script_location: path to etl script either on s3 or local
 
 Review comment:
   The description is now wrong! It only allows a s3 location!


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration

2020-04-02 Thread GitBox
feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS 
Glue Job Integration
URL: https://github.com/apache/airflow/pull/6007#discussion_r402402859
 
 

 ##
 File path: airflow/providers/amazon/aws/sensors/glue.py
 ##
 @@ -0,0 +1,56 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+from airflow.providers.amazon.aws.hooks.glue import AwsGlueJobHook
+
+
+class AwsGlueJobSensor(BaseSensorOperator):
+"""
+Waits for an AWS Glue Job to reach any of the status below
+'FAILED', 'STOPPED', 'SUCCEEDED'
+
+:param job_name: The AWS Glue Job unique name
+:type job_name: str
+:param run_id: The AWS Glue current running job identifier
+:type str
 
 Review comment:
   ```suggestion
   :type run_id: str
   ```
   I would really appreciate it if you could take another look on your 
documentation.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration

2020-04-02 Thread GitBox
feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS 
Glue Job Integration
URL: https://github.com/apache/airflow/pull/6007#discussion_r402395249
 
 

 ##
 File path: airflow/providers/amazon/aws/hooks/glue.py
 ##
 @@ -0,0 +1,172 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+import time
+
+
+class AwsGlueJobHook(AwsBaseHook):
+"""
+Interact with AWS Glue - create job, trigger, crawler
+
+:param job_name: unique job name per AWS account
+:type job_name: Optional[str]
+:param desc: job description
+:type desc: Optional[str]
+:param concurrent_run_limit: The maximum number of concurrent runs allowed 
for a job
+:type concurrent_run_limit: int
+:param script_location: path to etl script either on s3 or local
+:type script_location: Optional[str]
+:param retry_limit: Maximum number of times to retry this job if it fails
+:type retry_limit: int
+:param num_of_dpus: Number of AWS Glue DPUs to allocate to this Job
+:type num_of_dpus: int
+:param region_name: aws region name (example: us-east-1)
+:type region_name: Optional[str]
+:param iam_role_name: AWS IAM Role for Glue Job
+:type iam_role_name: Optional[str]
+:param s3_bucket: S3 bucket where logs and local etl script will be 
uploaded
+:type s3_bucket: Optional[str]
+"""
+JOB_POLL_INTERVAL = 6 # polls job status after every JOB_POLL_INTERVAL 
seconds
+
+def __init__(self,
+ job_name=None,
+ desc=None,
+ concurrent_run_limit=1,
+ script_location=None,
+ retry_limit=0,
+ num_of_dpus=10,
+ region_name=None,
+ iam_role_name=None,
+ s3_bucket, *args, **kwargs):
+self.job_name = job_name
+self.desc = desc
+self.concurrent_run_limit = concurrent_run_limit
+self.script_location = script_location
+self.retry_limit = retry_limit
+self.num_of_dpus = num_of_dpus
+self.region_name = region_name
+self.s3_bucket = s3_bucket
+self.role_name = iam_role_name
+self.S3_GLUE_LOGS = 'logs/glue-logs/'
+super(AwsGlueJobHook, self).__init__(*args, **kwargs)
+
+def get_conn(self):
+conn = self.get_client_type('glue', self.region_name)
+return conn
+
+def list_jobs(self):
+conn = self.get_conn()
+return conn.get_jobs()
+
+def get_iam_execution_role(self):
+"""
+:return: iam role for job execution
+"""
+iam_client = self.get_client_type('iam', self.region_name)
+
+try:
+glue_execution_role = iam_client.get_role(RoleName=self.role_name)
+self.log.info("Iam Role Name: %s", self.role_name)
+return glue_execution_role
+except Exception as general_error:
+self.log.error(f'Failed to create aws glue job, error: 
{str(general_error)}')
+raise
+
+def initialize_job(self, script_arguments=None):
+"""
+Initializes connection with AWS Glue
+to run job
+:return:
+"""
+if self.s3_bucket is None:
+raise AirflowException(
+'Could not initialize glue job, '
+'error: Specify Parameter `s3_bucket`'
+)
+
+glue_client = self.get_conn()
+
+try:
+job_name = self.get_or_create_glue_job()
+job_run = glue_client.start_job_run(
+JobName=job_name,
+Arguments=script_arguments
+)
+return self.job_completion(job_name, job_run['JobRunId'])
+except Exception as general_error:
+self.log.error(f'Failed to run aws glue job, error: 
{str(general_error)}')
+raise
+
+def job_completion(self, job_name=None, run_id=None):
+"""
+:param job_name: unique job name per AWS account
+:type job_name: str
+:param run_id: The job-run ID of the predecessor job run
+

[GitHub] [airflow] feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration

2020-04-02 Thread GitBox
feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS 
Glue Job Integration
URL: https://github.com/apache/airflow/pull/6007#discussion_r402350732
 
 

 ##
 File path: airflow/providers/amazon/aws/hooks/glue.py
 ##
 @@ -0,0 +1,214 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+import os.path
+import time
+
+
+class AwsGlueJobHook(AwsBaseHook):
+"""
+Interact with AWS Glue - create job, trigger, crawler
+
+:param job_name: unique job name per AWS account
+:type job_name: str
+:param desc: job description
+:type desc: str
+:param concurrent_run_limit: The maximum number of concurrent runs allowed 
for a job
+:type concurrent_run_limit: int
+:param script_location: path to etl script either on s3 or local
+:type script_location: str
+:param conns: A list of connections used by the job
+:type conns: list
+:param retry_limit: Maximum number of times to retry this job if it fails
+:type retry_limit: int
+:param num_of_dpus: Number of AWS Glue DPUs to allocate to this Job
+:type num_of_dpus: int
+:param region_name: aws region name (example: us-east-1)
+:type region_name: str
+:param iam_role_name: AWS IAM Role for Glue Job
+:type iam_role_name: str
+:param s3_bucket: S3 bucket where logs and local etl script will be 
uploaded
+:type s3_bucket: str
+"""
+
+def __init__(self,
+ job_name=None,
+ desc=None,
+ concurrent_run_limit=1,
+ script_location=None,
+ conns=None,
+ retry_limit=0,
+ num_of_dpus=10,
+ aws_conn_id='aws_default',
+ region_name=None,
+ iam_role_name=None,
+ s3_bucket=None, *args, **kwargs):
+self.job_name = job_name
+self.desc = desc
+self.concurrent_run_limit = concurrent_run_limit
+self.script_location = script_location
+self.conns = conns or ["s3"]
+self.retry_limit = retry_limit
+self.num_of_dpus = num_of_dpus
+self.aws_conn_id = aws_conn_id
+self.region_name = region_name
+self.s3_bucket = s3_bucket
+self.role_name = iam_role_name
+self.S3_PROTOCOL = "s3://"
+self.S3_ARTIFACTS_PREFIX = 'artifacts/glue-scripts/'
+self.S3_GLUE_LOGS = 'logs/glue-logs/'
+super(AwsGlueJobHook, self).__init__(*args, **kwargs)
+
+def get_conn(self):
+conn = self.get_client_type('glue', self.region_name)
+return conn
+
+def list_jobs(self):
+conn = self.get_conn()
+return conn.get_jobs()
+
+def get_iam_execution_role(self):
+"""
+:return: iam role for job execution
+"""
+iam_client = self.get_client_type('iam', self.region_name)
+
+try:
+glue_execution_role = iam_client.get_role(RoleName=self.role_name)
+self.log.info("Iam Role Name: %s", self.role_name)
+return glue_execution_role
+except Exception as general_error:
+self.log.error(f'Failed to create aws glue job, error: 
{str(general_error)}')
+raise
+
+def initialize_job(self, script_arguments=None):
+"""
+Initializes connection with AWS Glue
+to run job
+:return:
+"""
+if self.s3_bucket is None:
+raise AirflowException(
+'Could not initialize glue job, '
+'error: Specify Parameter `s3_bucket`'
+)
+
+glue_client = self.get_conn()
+
+try:
+job_response = self.get_or_create_glue_job()
+job_name = job_response
+job_run = glue_client.start_job_run(
+JobName=job_name,
+Arguments=script_arguments
+)
+return self.job_completion(job_name, job_run['JobRunId'])
+except Exception as general_error:
+self.log.error(f'Failed to run aws glue job, error: 
{str(general_error)}')
+  

[GitHub] [airflow] feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration

2020-04-02 Thread GitBox
feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS 
Glue Job Integration
URL: https://github.com/apache/airflow/pull/6007#discussion_r402342249
 
 

 ##
 File path: airflow/providers/amazon/aws/hooks/glue.py
 ##
 @@ -0,0 +1,214 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+import os.path
+import time
+
+
+class AwsGlueJobHook(AwsBaseHook):
+"""
+Interact with AWS Glue - create job, trigger, crawler
+
+:param job_name: unique job name per AWS account
+:type job_name: str
+:param desc: job description
+:type desc: str
+:param concurrent_run_limit: The maximum number of concurrent runs allowed 
for a job
+:type concurrent_run_limit: int
+:param script_location: path to etl script either on s3 or local
+:type script_location: str
+:param conns: A list of connections used by the job
+:type conns: list
+:param retry_limit: Maximum number of times to retry this job if it fails
+:type retry_limit: int
+:param num_of_dpus: Number of AWS Glue DPUs to allocate to this Job
+:type num_of_dpus: int
+:param region_name: aws region name (example: us-east-1)
+:type region_name: str
+:param iam_role_name: AWS IAM Role for Glue Job
+:type iam_role_name: str
+:param s3_bucket: S3 bucket where logs and local etl script will be 
uploaded
+:type s3_bucket: str
+"""
+
+def __init__(self,
+ job_name=None,
+ desc=None,
+ concurrent_run_limit=1,
+ script_location=None,
+ conns=None,
+ retry_limit=0,
+ num_of_dpus=10,
+ aws_conn_id='aws_default',
+ region_name=None,
+ iam_role_name=None,
+ s3_bucket=None, *args, **kwargs):
+self.job_name = job_name
+self.desc = desc
+self.concurrent_run_limit = concurrent_run_limit
+self.script_location = script_location
+self.conns = conns or ["s3"]
+self.retry_limit = retry_limit
+self.num_of_dpus = num_of_dpus
+self.aws_conn_id = aws_conn_id
+self.region_name = region_name
+self.s3_bucket = s3_bucket
+self.role_name = iam_role_name
+self.S3_PROTOCOL = "s3://"
+self.S3_ARTIFACTS_PREFIX = 'artifacts/glue-scripts/'
+self.S3_GLUE_LOGS = 'logs/glue-logs/'
+super(AwsGlueJobHook, self).__init__(*args, **kwargs)
+
+def get_conn(self):
+conn = self.get_client_type('glue', self.region_name)
+return conn
+
+def list_jobs(self):
+conn = self.get_conn()
+return conn.get_jobs()
+
+def get_iam_execution_role(self):
+"""
+:return: iam role for job execution
+"""
+iam_client = self.get_client_type('iam', self.region_name)
+
+try:
+glue_execution_role = iam_client.get_role(RoleName=self.role_name)
+self.log.info("Iam Role Name: %s", self.role_name)
+return glue_execution_role
+except Exception as general_error:
+self.log.error(f'Failed to create aws glue job, error: 
{str(general_error)}')
+raise
 
 Review comment:
   Yes.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration

2020-04-02 Thread GitBox
feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS 
Glue Job Integration
URL: https://github.com/apache/airflow/pull/6007#discussion_r402314016
 
 

 ##
 File path: airflow/providers/amazon/aws/hooks/glue.py
 ##
 @@ -0,0 +1,214 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+import os.path
+import time
+
+
+class AwsGlueJobHook(AwsBaseHook):
+"""
+Interact with AWS Glue - create job, trigger, crawler
+
+:param job_name: unique job name per AWS account
+:type job_name: str
+:param desc: job description
+:type desc: str
+:param concurrent_run_limit: The maximum number of concurrent runs allowed 
for a job
+:type concurrent_run_limit: int
+:param script_location: path to etl script either on s3 or local
+:type script_location: str
+:param conns: A list of connections used by the job
+:type conns: list
+:param retry_limit: Maximum number of times to retry this job if it fails
+:type retry_limit: int
+:param num_of_dpus: Number of AWS Glue DPUs to allocate to this Job
+:type num_of_dpus: int
+:param region_name: aws region name (example: us-east-1)
+:type region_name: str
+:param iam_role_name: AWS IAM Role for Glue Job
+:type iam_role_name: str
+:param s3_bucket: S3 bucket where logs and local etl script will be 
uploaded
+:type s3_bucket: str
+"""
+
+def __init__(self,
+ job_name=None,
+ desc=None,
+ concurrent_run_limit=1,
+ script_location=None,
+ conns=None,
+ retry_limit=0,
+ num_of_dpus=10,
+ aws_conn_id='aws_default',
+ region_name=None,
+ iam_role_name=None,
+ s3_bucket=None, *args, **kwargs):
+self.job_name = job_name
+self.desc = desc
+self.concurrent_run_limit = concurrent_run_limit
+self.script_location = script_location
+self.conns = conns or ["s3"]
+self.retry_limit = retry_limit
+self.num_of_dpus = num_of_dpus
+self.aws_conn_id = aws_conn_id
+self.region_name = region_name
+self.s3_bucket = s3_bucket
+self.role_name = iam_role_name
+self.S3_PROTOCOL = "s3://"
+self.S3_ARTIFACTS_PREFIX = 'artifacts/glue-scripts/'
+self.S3_GLUE_LOGS = 'logs/glue-logs/'
+super(AwsGlueJobHook, self).__init__(*args, **kwargs)
+
+def get_conn(self):
+conn = self.get_client_type('glue', self.region_name)
+return conn
+
+def list_jobs(self):
+conn = self.get_conn()
+return conn.get_jobs()
+
+def get_iam_execution_role(self):
+"""
+:return: iam role for job execution
+"""
+iam_client = self.get_client_type('iam', self.region_name)
+
+try:
+glue_execution_role = iam_client.get_role(RoleName=self.role_name)
+self.log.info("Iam Role Name: %s", self.role_name)
+return glue_execution_role
+except Exception as general_error:
+self.log.error(f'Failed to create aws glue job, error: 
{str(general_error)}')
+raise
+
+def initialize_job(self, script_arguments=None):
+"""
+Initializes connection with AWS Glue
+to run job
+:return:
+"""
+if self.s3_bucket is None:
+raise AirflowException(
+'Could not initialize glue job, '
+'error: Specify Parameter `s3_bucket`'
+)
+
+glue_client = self.get_conn()
+
+try:
+job_response = self.get_or_create_glue_job()
+job_name = job_response
+job_run = glue_client.start_job_run(
+JobName=job_name,
+Arguments=script_arguments
+)
+return self.job_completion(job_name, job_run['JobRunId'])
+except Exception as general_error:
+self.log.error(f'Failed to run aws glue job, error: 
{str(general_error)}')
+  

[GitHub] [airflow] feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration

2020-04-02 Thread GitBox
feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS 
Glue Job Integration
URL: https://github.com/apache/airflow/pull/6007#discussion_r402316781
 
 

 ##
 File path: airflow/providers/amazon/aws/hooks/glue.py
 ##
 @@ -0,0 +1,214 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+import os.path
+import time
+
+
+class AwsGlueJobHook(AwsBaseHook):
+"""
+Interact with AWS Glue - create job, trigger, crawler
+
+:param job_name: unique job name per AWS account
+:type job_name: str
+:param desc: job description
+:type desc: str
+:param concurrent_run_limit: The maximum number of concurrent runs allowed 
for a job
+:type concurrent_run_limit: int
+:param script_location: path to etl script either on s3 or local
+:type script_location: str
+:param conns: A list of connections used by the job
+:type conns: list
+:param retry_limit: Maximum number of times to retry this job if it fails
+:type retry_limit: int
+:param num_of_dpus: Number of AWS Glue DPUs to allocate to this Job
+:type num_of_dpus: int
+:param region_name: aws region name (example: us-east-1)
+:type region_name: str
+:param iam_role_name: AWS IAM Role for Glue Job
+:type iam_role_name: str
+:param s3_bucket: S3 bucket where logs and local etl script will be 
uploaded
+:type s3_bucket: str
+"""
+
+def __init__(self,
+ job_name=None,
+ desc=None,
+ concurrent_run_limit=1,
+ script_location=None,
+ conns=None,
+ retry_limit=0,
+ num_of_dpus=10,
+ aws_conn_id='aws_default',
+ region_name=None,
+ iam_role_name=None,
+ s3_bucket=None, *args, **kwargs):
+self.job_name = job_name
+self.desc = desc
+self.concurrent_run_limit = concurrent_run_limit
+self.script_location = script_location
+self.conns = conns or ["s3"]
+self.retry_limit = retry_limit
+self.num_of_dpus = num_of_dpus
+self.aws_conn_id = aws_conn_id
+self.region_name = region_name
+self.s3_bucket = s3_bucket
+self.role_name = iam_role_name
+self.S3_PROTOCOL = "s3://"
+self.S3_ARTIFACTS_PREFIX = 'artifacts/glue-scripts/'
+self.S3_GLUE_LOGS = 'logs/glue-logs/'
+super(AwsGlueJobHook, self).__init__(*args, **kwargs)
+
+def get_conn(self):
+conn = self.get_client_type('glue', self.region_name)
+return conn
+
+def list_jobs(self):
+conn = self.get_conn()
+return conn.get_jobs()
+
+def get_iam_execution_role(self):
+"""
+:return: iam role for job execution
+"""
+iam_client = self.get_client_type('iam', self.region_name)
+
+try:
+glue_execution_role = iam_client.get_role(RoleName=self.role_name)
+self.log.info("Iam Role Name: %s", self.role_name)
+return glue_execution_role
+except Exception as general_error:
+self.log.error(f'Failed to create aws glue job, error: 
{str(general_error)}')
+raise
 
 Review comment:
   Yes but the `get_role` is accessing the `boto` library directly (because 
`get_client_type` return a `boto` object) which should actually only do the 
related hook. If we had one it would be `AwsIamHook`.
   
   But if you cannot make it work with the functions in `AwsBaseHook` you can 
access it for now like you do.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration

2020-04-02 Thread GitBox
feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS 
Glue Job Integration
URL: https://github.com/apache/airflow/pull/6007#discussion_r402316781
 
 

 ##
 File path: airflow/providers/amazon/aws/hooks/glue.py
 ##
 @@ -0,0 +1,214 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+import os.path
+import time
+
+
+class AwsGlueJobHook(AwsBaseHook):
+"""
+Interact with AWS Glue - create job, trigger, crawler
+
+:param job_name: unique job name per AWS account
+:type job_name: str
+:param desc: job description
+:type desc: str
+:param concurrent_run_limit: The maximum number of concurrent runs allowed 
for a job
+:type concurrent_run_limit: int
+:param script_location: path to etl script either on s3 or local
+:type script_location: str
+:param conns: A list of connections used by the job
+:type conns: list
+:param retry_limit: Maximum number of times to retry this job if it fails
+:type retry_limit: int
+:param num_of_dpus: Number of AWS Glue DPUs to allocate to this Job
+:type num_of_dpus: int
+:param region_name: aws region name (example: us-east-1)
+:type region_name: str
+:param iam_role_name: AWS IAM Role for Glue Job
+:type iam_role_name: str
+:param s3_bucket: S3 bucket where logs and local etl script will be 
uploaded
+:type s3_bucket: str
+"""
+
+def __init__(self,
+ job_name=None,
+ desc=None,
+ concurrent_run_limit=1,
+ script_location=None,
+ conns=None,
+ retry_limit=0,
+ num_of_dpus=10,
+ aws_conn_id='aws_default',
+ region_name=None,
+ iam_role_name=None,
+ s3_bucket=None, *args, **kwargs):
+self.job_name = job_name
+self.desc = desc
+self.concurrent_run_limit = concurrent_run_limit
+self.script_location = script_location
+self.conns = conns or ["s3"]
+self.retry_limit = retry_limit
+self.num_of_dpus = num_of_dpus
+self.aws_conn_id = aws_conn_id
+self.region_name = region_name
+self.s3_bucket = s3_bucket
+self.role_name = iam_role_name
+self.S3_PROTOCOL = "s3://"
+self.S3_ARTIFACTS_PREFIX = 'artifacts/glue-scripts/'
+self.S3_GLUE_LOGS = 'logs/glue-logs/'
+super(AwsGlueJobHook, self).__init__(*args, **kwargs)
+
+def get_conn(self):
+conn = self.get_client_type('glue', self.region_name)
+return conn
+
+def list_jobs(self):
+conn = self.get_conn()
+return conn.get_jobs()
+
+def get_iam_execution_role(self):
+"""
+:return: iam role for job execution
+"""
+iam_client = self.get_client_type('iam', self.region_name)
+
+try:
+glue_execution_role = iam_client.get_role(RoleName=self.role_name)
+self.log.info("Iam Role Name: %s", self.role_name)
+return glue_execution_role
+except Exception as general_error:
+self.log.error(f'Failed to create aws glue job, error: 
{str(general_error)}')
+raise
 
 Review comment:
   Yes but the `get_role` is accessing the `boto` library directly which should 
actually only do the related hook. If we had one it would be `AwsIamHook`.
   
   But if you cannot make it work with the functions in `AwsBaseHook` you can 
access it for now like you do.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration

2020-04-02 Thread GitBox
feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS 
Glue Job Integration
URL: https://github.com/apache/airflow/pull/6007#discussion_r402314016
 
 

 ##
 File path: airflow/providers/amazon/aws/hooks/glue.py
 ##
 @@ -0,0 +1,214 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+import os.path
+import time
+
+
+class AwsGlueJobHook(AwsBaseHook):
+"""
+Interact with AWS Glue - create job, trigger, crawler
+
+:param job_name: unique job name per AWS account
+:type job_name: str
+:param desc: job description
+:type desc: str
+:param concurrent_run_limit: The maximum number of concurrent runs allowed 
for a job
+:type concurrent_run_limit: int
+:param script_location: path to etl script either on s3 or local
+:type script_location: str
+:param conns: A list of connections used by the job
+:type conns: list
+:param retry_limit: Maximum number of times to retry this job if it fails
+:type retry_limit: int
+:param num_of_dpus: Number of AWS Glue DPUs to allocate to this Job
+:type num_of_dpus: int
+:param region_name: aws region name (example: us-east-1)
+:type region_name: str
+:param iam_role_name: AWS IAM Role for Glue Job
+:type iam_role_name: str
+:param s3_bucket: S3 bucket where logs and local etl script will be 
uploaded
+:type s3_bucket: str
+"""
+
+def __init__(self,
+ job_name=None,
+ desc=None,
+ concurrent_run_limit=1,
+ script_location=None,
+ conns=None,
+ retry_limit=0,
+ num_of_dpus=10,
+ aws_conn_id='aws_default',
+ region_name=None,
+ iam_role_name=None,
+ s3_bucket=None, *args, **kwargs):
+self.job_name = job_name
+self.desc = desc
+self.concurrent_run_limit = concurrent_run_limit
+self.script_location = script_location
+self.conns = conns or ["s3"]
+self.retry_limit = retry_limit
+self.num_of_dpus = num_of_dpus
+self.aws_conn_id = aws_conn_id
+self.region_name = region_name
+self.s3_bucket = s3_bucket
+self.role_name = iam_role_name
+self.S3_PROTOCOL = "s3://"
+self.S3_ARTIFACTS_PREFIX = 'artifacts/glue-scripts/'
+self.S3_GLUE_LOGS = 'logs/glue-logs/'
+super(AwsGlueJobHook, self).__init__(*args, **kwargs)
+
+def get_conn(self):
+conn = self.get_client_type('glue', self.region_name)
+return conn
+
+def list_jobs(self):
+conn = self.get_conn()
+return conn.get_jobs()
+
+def get_iam_execution_role(self):
+"""
+:return: iam role for job execution
+"""
+iam_client = self.get_client_type('iam', self.region_name)
+
+try:
+glue_execution_role = iam_client.get_role(RoleName=self.role_name)
+self.log.info("Iam Role Name: %s", self.role_name)
+return glue_execution_role
+except Exception as general_error:
+self.log.error(f'Failed to create aws glue job, error: 
{str(general_error)}')
+raise
+
+def initialize_job(self, script_arguments=None):
+"""
+Initializes connection with AWS Glue
+to run job
+:return:
+"""
+if self.s3_bucket is None:
+raise AirflowException(
+'Could not initialize glue job, '
+'error: Specify Parameter `s3_bucket`'
+)
+
+glue_client = self.get_conn()
+
+try:
+job_response = self.get_or_create_glue_job()
+job_name = job_response
+job_run = glue_client.start_job_run(
+JobName=job_name,
+Arguments=script_arguments
+)
+return self.job_completion(job_name, job_run['JobRunId'])
+except Exception as general_error:
+self.log.error(f'Failed to run aws glue job, error: 
{str(general_error)}')
+  

[GitHub] [airflow] feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration

2020-04-02 Thread GitBox
feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS 
Glue Job Integration
URL: https://github.com/apache/airflow/pull/6007#discussion_r402290218
 
 

 ##
 File path: airflow/providers/amazon/aws/sensors/glue.py
 ##
 @@ -0,0 +1,57 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+from airflow.providers.amazon.aws.hooks.glue import AwsGlueJobHook
+
+
+class AwsGlueJobSensor(BaseSensorOperator):
+"""
+Waits for an AWS Glue Job to reach any of the status below
+'FAILED', 'STOPPED', 'SUCCEEDED'
+
+:param job_name: The AWS Glue Job unique name
+:type str
+:param run_id: The AWS Glue current running job identifier
+:type str
+"""
+template_fields = ('job_name', 'run_id')
+
+@apply_defaults
+def __init__(self,
+ job_name,
+ run_id,
+ aws_conn_id='aws_default',
+ *args,
+ **kwargs):
+super(AwsGlueJobSensor, self).__init__(*args, **kwargs)
+self.job_name = job_name
+self.run_id = run_id
+self.aws_conn_id = aws_conn_id
+self.targeted_status = ['FAILED', 'STOPPED', 'SUCCEEDED']
+
+def poke(self, context):
+self.log.info("Poking for job run status : {self.targeted_status}\n"
+  "for Glue Job {self.job_name} and ID {self.run_id}"
+  .format(**locals()))
+hook = AwsGlueJobHook(aws_conn_id=self.aws_conn_id)
+job_state = hook.job_completion(job_name=self.job_name,
+run_id=self.run_id)
+return job_state['JobRunState'].upper() in self.targeted_status
 
 Review comment:
   If it is like you said you can mark it resolved.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration

2020-04-02 Thread GitBox
feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS 
Glue Job Integration
URL: https://github.com/apache/airflow/pull/6007#discussion_r402155647
 
 

 ##
 File path: airflow/providers/amazon/aws/hooks/glue.py
 ##
 @@ -0,0 +1,214 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+import os.path
+import time
+
+
+class AwsGlueJobHook(AwsBaseHook):
+"""
+Interact with AWS Glue - create job, trigger, crawler
+
+:param job_name: unique job name per AWS account
+:type job_name: str
+:param desc: job description
+:type desc: str
+:param concurrent_run_limit: The maximum number of concurrent runs allowed 
for a job
+:type concurrent_run_limit: int
+:param script_location: path to etl script either on s3 or local
+:type script_location: str
+:param conns: A list of connections used by the job
+:type conns: list
+:param retry_limit: Maximum number of times to retry this job if it fails
+:type retry_limit: int
+:param num_of_dpus: Number of AWS Glue DPUs to allocate to this Job
+:type num_of_dpus: int
+:param region_name: aws region name (example: us-east-1)
+:type region_name: str
+:param iam_role_name: AWS IAM Role for Glue Job
+:type iam_role_name: str
+:param s3_bucket: S3 bucket where logs and local etl script will be 
uploaded
+:type s3_bucket: str
+"""
+
+def __init__(self,
+ job_name=None,
+ desc=None,
+ concurrent_run_limit=1,
+ script_location=None,
+ conns=None,
+ retry_limit=0,
+ num_of_dpus=10,
+ aws_conn_id='aws_default',
+ region_name=None,
+ iam_role_name=None,
+ s3_bucket=None, *args, **kwargs):
+self.job_name = job_name
+self.desc = desc
+self.concurrent_run_limit = concurrent_run_limit
+self.script_location = script_location
+self.conns = conns or ["s3"]
 
 Review comment:
   This value is not being used.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration

2020-04-02 Thread GitBox
feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS 
Glue Job Integration
URL: https://github.com/apache/airflow/pull/6007#discussion_r402150450
 
 

 ##
 File path: airflow/providers/amazon/aws/operators/glue.py
 ##
 @@ -0,0 +1,110 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import unicode_literals
+
+from airflow.providers.amazon.aws.hooks.glue import AwsGlueJobHook
+from airflow.models import BaseOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class AWSGlueJobOperator(BaseOperator):
+"""
+Creates an AWS Glue Job. AWS Glue is a serverless Spark
+ETL service for running Spark Jobs on the AWS cloud.
+Language support: Python and Scala
+
+:param job_name: unique job name per AWS Account
+:type str
+:param script_location: location of ETL script. Must be a local or S3 path
+:type str
+:param job_desc: job description details
+:type str
+:param concurrent_run_limit: The maximum number of concurrent runs allowed 
for a job
+:type int
+:param script_args: etl script arguments and AWS Glue arguments
+:type dict
+:param connections: AWS Glue connections to be used by the job.
+:type list
+:param retry_limit: The maximum number of times to retry this job if it 
fails
+:type int
+:param num_of_dpus: Number of AWS Glue DPUs to allocate to this Job.
+:type int
+:param region_name: aws region name (example: us-east-1)
+:type region_name: str
+:param s3_bucket: S3 bucket where logs and local etl script will be 
uploaded
+:type str
+:param iam_role_name: AWS IAM Role for Glue Job Execution
+:type str
+"""
+template_fields = ()
+template_ext = ()
+ui_color = '#ededed'
+
+@apply_defaults
+def __init__(self,
+ job_name='aws_glue_default_job',
+ job_desc='AWS Glue Job with Airflow',
+ script_location=None,
+ concurrent_run_limit=None,
+ script_args={},
+ connections=[],
+ retry_limit=None,
+ num_of_dpus=6,
+ aws_conn_id='aws_default',
+ region_name=None,
+ s3_bucket=None,
+ iam_role_name=None,
+ *args, **kwargs
+ ):
+super(AWSGlueJobOperator, self).__init__(*args, **kwargs)
+self.job_name = job_name
+self.job_desc = job_desc
+self.script_location = script_location
+self.concurrent_run_limit = concurrent_run_limit
+self.script_args = script_args
+self.connections = connections
+self.retry_limit = retry_limit
+self.num_of_dpus = num_of_dpus
+self.aws_conn_id = aws_conn_id,
+self.region_name = region_name
+self.s3_bucket = s3_bucket
+self.iam_role_name = iam_role_name
+
+def execute(self, context):
+"""
+Executes AWS Glue Job from Airflow
+:return:
+"""
+task_instance = context['ti']
+glue_job = AwsGlueJobHook(job_name=self.job_name,
+  desc=self.job_desc,
+  
concurrent_run_limit=self.concurrent_run_limit,
+  script_location=self.script_location,
+  conns=self.connections,
+  retry_limit=self.retry_limit,
+  num_of_dpus=self.num_of_dpus,
+  aws_conn_id=self.aws_conn_id,
+  region_name=self.region_name,
+  s3_bucket=self.s3_bucket,
+  iam_role_name=self.iam_role_name)
+
+self.log.info("Initializing AWS Glue Job: %s",self.job_name)
+glue_job_run = glue_job.initialize_job(self.script_args)
+task_instance.xcom_push(key='run_id', value=glue_job_run['JobRunId'])
+self.log.info("AWS Glue Job: %s status: %s. Run Id: 
%s",glue_job_run['JobRunId'],self.job_name,glue_job_run['JobRunState'])
 
 Review comment:
   ```suggestion
   

[GitHub] [airflow] feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration

2020-04-02 Thread GitBox
feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS 
Glue Job Integration
URL: https://github.com/apache/airflow/pull/6007#discussion_r402143997
 
 

 ##
 File path: airflow/providers/amazon/aws/operators/glue.py
 ##
 @@ -0,0 +1,110 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import unicode_literals
+
+from airflow.providers.amazon.aws.hooks.glue import AwsGlueJobHook
+from airflow.models import BaseOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class AWSGlueJobOperator(BaseOperator):
+"""
+Creates an AWS Glue Job. AWS Glue is a serverless Spark
+ETL service for running Spark Jobs on the AWS cloud.
+Language support: Python and Scala
+
+:param job_name: unique job name per AWS Account
+:type str
 
 Review comment:
   It would also be nice to the user if you can update documentation and add a 
note that if it is None it will be created...


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration

2020-04-02 Thread GitBox
feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS 
Glue Job Integration
URL: https://github.com/apache/airflow/pull/6007#discussion_r402153063
 
 

 ##
 File path: tests/providers/amazon/aws/operators/test_aws_glue_job_operator.py
 ##
 @@ -0,0 +1,57 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+
+from airflow import configuration
+from airflow.contrib.hooks.aws_glue_job_hook import AwsGlueJobHook
+from airflow.contrib.operators.aws_glue_job_operator import AWSGlueJobOperator
+
+try:
+from unittest import mock
+except ImportError:
+try:
+import mock
+except ImportError:
+mock = None
+
+
+class TestAwsGlueJobOperator(unittest.TestCase):
+
+
@mock.patch('airflow.contrib.operators.aws_glue_job_operator.AwsGlueJobHook')
 
 Review comment:
   And do not forget to change the path here as well to the `providers` path.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration

2020-04-02 Thread GitBox
feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS 
Glue Job Integration
URL: https://github.com/apache/airflow/pull/6007#discussion_r402119711
 
 

 ##
 File path: airflow/providers/amazon/aws/hooks/glue.py
 ##
 @@ -0,0 +1,214 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+import os.path
+import time
+
+
+class AwsGlueJobHook(AwsBaseHook):
+"""
+Interact with AWS Glue - create job, trigger, crawler
+
+:param job_name: unique job name per AWS account
+:type job_name: str
+:param desc: job description
+:type desc: str
+:param concurrent_run_limit: The maximum number of concurrent runs allowed 
for a job
+:type concurrent_run_limit: int
+:param script_location: path to etl script either on s3 or local
+:type script_location: str
+:param conns: A list of connections used by the job
+:type conns: list
+:param retry_limit: Maximum number of times to retry this job if it fails
+:type retry_limit: int
+:param num_of_dpus: Number of AWS Glue DPUs to allocate to this Job
+:type num_of_dpus: int
+:param region_name: aws region name (example: us-east-1)
+:type region_name: str
+:param iam_role_name: AWS IAM Role for Glue Job
+:type iam_role_name: str
+:param s3_bucket: S3 bucket where logs and local etl script will be 
uploaded
+:type s3_bucket: str
+"""
+
+def __init__(self,
+ job_name=None,
+ desc=None,
+ concurrent_run_limit=1,
+ script_location=None,
+ conns=None,
+ retry_limit=0,
+ num_of_dpus=10,
+ aws_conn_id='aws_default',
+ region_name=None,
+ iam_role_name=None,
+ s3_bucket=None, *args, **kwargs):
+self.job_name = job_name
+self.desc = desc
+self.concurrent_run_limit = concurrent_run_limit
+self.script_location = script_location
+self.conns = conns or ["s3"]
 
 Review comment:
   ```suggestion
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration

2020-04-02 Thread GitBox
feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS 
Glue Job Integration
URL: https://github.com/apache/airflow/pull/6007#discussion_r402148111
 
 

 ##
 File path: airflow/providers/amazon/aws/operators/glue.py
 ##
 @@ -0,0 +1,110 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import unicode_literals
+
+from airflow.providers.amazon.aws.hooks.glue import AwsGlueJobHook
+from airflow.models import BaseOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class AWSGlueJobOperator(BaseOperator):
+"""
+Creates an AWS Glue Job. AWS Glue is a serverless Spark
+ETL service for running Spark Jobs on the AWS cloud.
+Language support: Python and Scala
+
+:param job_name: unique job name per AWS Account
+:type str
+:param script_location: location of ETL script. Must be a local or S3 path
+:type str
+:param job_desc: job description details
+:type str
+:param concurrent_run_limit: The maximum number of concurrent runs allowed 
for a job
+:type int
+:param script_args: etl script arguments and AWS Glue arguments
+:type dict
+:param connections: AWS Glue connections to be used by the job.
+:type list
+:param retry_limit: The maximum number of times to retry this job if it 
fails
+:type int
+:param num_of_dpus: Number of AWS Glue DPUs to allocate to this Job.
+:type int
+:param region_name: aws region name (example: us-east-1)
+:type region_name: str
+:param s3_bucket: S3 bucket where logs and local etl script will be 
uploaded
+:type str
+:param iam_role_name: AWS IAM Role for Glue Job Execution
+:type str
+"""
+template_fields = ()
+template_ext = ()
+ui_color = '#ededed'
+
+@apply_defaults
+def __init__(self,
+ job_name='aws_glue_default_job',
+ job_desc='AWS Glue Job with Airflow',
+ script_location=None,
+ concurrent_run_limit=None,
+ script_args={},
+ connections=[],
+ retry_limit=None,
+ num_of_dpus=6,
+ aws_conn_id='aws_default',
+ region_name=None,
+ s3_bucket=None,
+ iam_role_name=None,
+ *args, **kwargs
+ ):
+super(AWSGlueJobOperator, self).__init__(*args, **kwargs)
+self.job_name = job_name
+self.job_desc = job_desc
+self.script_location = script_location
+self.concurrent_run_limit = concurrent_run_limit
+self.script_args = script_args
+self.connections = connections
+self.retry_limit = retry_limit
+self.num_of_dpus = num_of_dpus
+self.aws_conn_id = aws_conn_id,
+self.region_name = region_name
+self.s3_bucket = s3_bucket
+self.iam_role_name = iam_role_name
+
+def execute(self, context):
+"""
+Executes AWS Glue Job from Airflow
 
 Review comment:
   ```suggestion
   Executes AWS Glue Job from Airflow
   
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration

2020-04-02 Thread GitBox
feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS 
Glue Job Integration
URL: https://github.com/apache/airflow/pull/6007#discussion_r402135822
 
 

 ##
 File path: airflow/providers/amazon/aws/hooks/glue.py
 ##
 @@ -0,0 +1,214 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.exceptions import AirflowException
+from airflow.contrib.hooks.aws_hook import AwsHook
+import os.path
+import time
+
+
+class AwsGlueJobHook(AwsHook):
+"""
+Interact with AWS Glue - create job, trigger, crawler
+
+:param job_name: unique job name per AWS account
+:type str
+:param desc: job description
+:type str
+:param concurrent_run_limit: The maximum number of concurrent runs allowed 
for a job
+:type int
+:param script_location: path to etl script either on s3 or local
+:type str
+:param conns: A list of connections used by the job
+:type list
+:param retry_limit: Maximum number of times to retry this job if it fails
+:type int
+:param num_of_dpus: Number of AWS Glue DPUs to allocate to this Job
+:type int
+:param region_name: aws region name (example: us-east-1)
+:type region_name: str
+:param s3_bucket: S3 bucket where logs and local etl script will be 
uploaded
+:type str
+:param iam_role_name: AWS IAM Role for Glue Job
+:type str
+"""
+
+def __init__(self,
+ job_name=None,
+ desc=None,
+ concurrent_run_limit=None,
+ script_location=None,
+ conns=None,
+ retry_limit=None,
+ num_of_dpus=None,
+ aws_conn_id='aws_default',
+ region_name=None,
+ iam_role_name=None,
+ s3_bucket=None, *args, **kwargs):
+self.job_name = job_name
+self.desc = desc
+self.concurrent_run_limit = concurrent_run_limit or 1
+self.script_location = script_location
+self.conns = conns or ["s3"]
+self.retry_limit = retry_limit or 0
+self.num_of_dpus = num_of_dpus or 10
+self.aws_conn_id = aws_conn_id
+self.region_name = region_name
+self.s3_bucket = s3_bucket
+self.role_name = iam_role_name
+self.S3_PROTOCOL = "s3://"
+self.S3_ARTIFACTS_PREFIX = 'artifacts/glue-scripts/'
+self.S3_GLUE_LOGS = 'logs/glue-logs/'
+super(AwsGlueJobHook, self).__init__(*args, **kwargs)
+
+def get_conn(self):
+conn = self.get_client_type('glue', self.region_name)
+return conn
+
+def list_jobs(self):
+conn = self.get_conn()
+return conn.get_jobs()
+
+def get_iam_execution_role(self):
+"""
+:return: iam role for job execution
+"""
+iam_client = self.get_client_type('iam', self.region_name)
+
+try:
+glue_execution_role = iam_client.get_role(RoleName=self.role_name)
+self.log.info("Iam Role Name: {}".format(self.role_name))
+return glue_execution_role
+except Exception as general_error:
+raise AirflowException(
+'Failed to create aws glue job, error: {error}'.format(
+error=str(general_error)
+)
+)
+
+def initialize_job(self, script_arguments=None):
+"""
+Initializes connection with AWS Glue
+to run job
+:return:
+"""
+if self.s3_bucket is None:
+raise AirflowException(
+'Could not initialize glue job, '
+'error: Specify Parameter `s3_bucket`'
+)
+
+glue_client = self.get_conn()
+
+try:
+job_response = self.get_or_create_glue_job()
+job_name = job_response
+job_run = glue_client.start_job_run(
+JobName=job_name,
+Arguments=script_arguments
+)
+return self.job_completion(job_name, job_run['JobRunId'])
+except Exception as general_error:
+raise AirflowException(
+'Failed to run aws glue job, error: {error}'.format(
+ 

[GitHub] [airflow] feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration

2020-04-02 Thread GitBox
feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS 
Glue Job Integration
URL: https://github.com/apache/airflow/pull/6007#discussion_r402142747
 
 

 ##
 File path: airflow/providers/amazon/aws/operators/glue.py
 ##
 @@ -0,0 +1,110 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import unicode_literals
+
+from airflow.providers.amazon.aws.hooks.glue import AwsGlueJobHook
+from airflow.models import BaseOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class AWSGlueJobOperator(BaseOperator):
+"""
+Creates an AWS Glue Job. AWS Glue is a serverless Spark
+ETL service for running Spark Jobs on the AWS cloud.
+Language support: Python and Scala
+
+:param job_name: unique job name per AWS Account
+:type str
 
 Review comment:
   ```suggestion
   :type job_name: Optional[str]
   ```
   
   The same I mentioned in the AwsGlueHook ;)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration

2020-04-02 Thread GitBox
feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS 
Glue Job Integration
URL: https://github.com/apache/airflow/pull/6007#discussion_r402151418
 
 

 ##
 File path: airflow/providers/amazon/aws/sensors/glue.py
 ##
 @@ -0,0 +1,57 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+from airflow.providers.amazon.aws.hooks.glue import AwsGlueJobHook
+
+
+class AwsGlueJobSensor(BaseSensorOperator):
+"""
+Waits for an AWS Glue Job to reach any of the status below
+'FAILED', 'STOPPED', 'SUCCEEDED'
+
+:param job_name: The AWS Glue Job unique name
+:type str
+:param run_id: The AWS Glue current running job identifier
+:type str
+"""
+template_fields = ('job_name', 'run_id')
+
+@apply_defaults
+def __init__(self,
+ job_name,
+ run_id,
+ aws_conn_id='aws_default',
+ *args,
+ **kwargs):
+super(AwsGlueJobSensor, self).__init__(*args, **kwargs)
+self.job_name = job_name
+self.run_id = run_id
+self.aws_conn_id = aws_conn_id
+self.targeted_status = ['FAILED', 'STOPPED', 'SUCCEEDED']
+
+def poke(self, context):
+self.log.info("Poking for job run status : {self.targeted_status}\n"
+  "for Glue Job {self.job_name} and ID {self.run_id}"
+  .format(**locals()))
+hook = AwsGlueJobHook(aws_conn_id=self.aws_conn_id)
+job_state = hook.job_completion(job_name=self.job_name,
+run_id=self.run_id)
+return job_state['JobRunState'].upper() in self.targeted_status
 
 Review comment:
   Do we really need the `.upper()` here?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration

2020-04-02 Thread GitBox
feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS 
Glue Job Integration
URL: https://github.com/apache/airflow/pull/6007#discussion_r402151955
 
 

 ##
 File path: airflow/providers/amazon/aws/sensors/glue.py
 ##
 @@ -0,0 +1,57 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+from airflow.providers.amazon.aws.hooks.glue import AwsGlueJobHook
+
+
+class AwsGlueJobSensor(BaseSensorOperator):
+"""
+Waits for an AWS Glue Job to reach any of the status below
+'FAILED', 'STOPPED', 'SUCCEEDED'
+
+:param job_name: The AWS Glue Job unique name
+:type str
 
 Review comment:
   ```suggestion
   :type job_name: str
   ```
   ;)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration

2020-04-02 Thread GitBox
feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS 
Glue Job Integration
URL: https://github.com/apache/airflow/pull/6007#discussion_r402122647
 
 

 ##
 File path: airflow/providers/amazon/aws/hooks/glue.py
 ##
 @@ -0,0 +1,214 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+import os.path
+import time
+
+
+class AwsGlueJobHook(AwsBaseHook):
+"""
+Interact with AWS Glue - create job, trigger, crawler
+
+:param job_name: unique job name per AWS account
+:type job_name: str
 
 Review comment:
   Same below.
   
   Note that it is the same for type `int`. It should be `Optional[int]` if you 
can leave it empty. Otherwise just `int`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration

2020-04-02 Thread GitBox
feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS 
Glue Job Integration
URL: https://github.com/apache/airflow/pull/6007#discussion_r402119817
 
 

 ##
 File path: airflow/providers/amazon/aws/hooks/glue.py
 ##
 @@ -0,0 +1,214 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+import os.path
+import time
+
+
+class AwsGlueJobHook(AwsBaseHook):
+"""
+Interact with AWS Glue - create job, trigger, crawler
+
+:param job_name: unique job name per AWS account
+:type job_name: str
+:param desc: job description
+:type desc: str
+:param concurrent_run_limit: The maximum number of concurrent runs allowed 
for a job
+:type concurrent_run_limit: int
+:param script_location: path to etl script either on s3 or local
+:type script_location: str
+:param conns: A list of connections used by the job
+:type conns: list
+:param retry_limit: Maximum number of times to retry this job if it fails
+:type retry_limit: int
+:param num_of_dpus: Number of AWS Glue DPUs to allocate to this Job
+:type num_of_dpus: int
+:param region_name: aws region name (example: us-east-1)
+:type region_name: str
+:param iam_role_name: AWS IAM Role for Glue Job
+:type iam_role_name: str
+:param s3_bucket: S3 bucket where logs and local etl script will be 
uploaded
+:type s3_bucket: str
+"""
+
+def __init__(self,
+ job_name=None,
+ desc=None,
+ concurrent_run_limit=1,
+ script_location=None,
+ conns=None,
+ retry_limit=0,
+ num_of_dpus=10,
+ aws_conn_id='aws_default',
 
 Review comment:
   ```suggestion
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration

2020-04-02 Thread GitBox
feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS 
Glue Job Integration
URL: https://github.com/apache/airflow/pull/6007#discussion_r402127385
 
 

 ##
 File path: airflow/providers/amazon/aws/hooks/glue.py
 ##
 @@ -0,0 +1,214 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+import os.path
+import time
+
+
+class AwsGlueJobHook(AwsBaseHook):
+"""
+Interact with AWS Glue - create job, trigger, crawler
+
+:param job_name: unique job name per AWS account
+:type job_name: str
+:param desc: job description
+:type desc: str
+:param concurrent_run_limit: The maximum number of concurrent runs allowed 
for a job
+:type concurrent_run_limit: int
+:param script_location: path to etl script either on s3 or local
+:type script_location: str
+:param conns: A list of connections used by the job
+:type conns: list
+:param retry_limit: Maximum number of times to retry this job if it fails
+:type retry_limit: int
+:param num_of_dpus: Number of AWS Glue DPUs to allocate to this Job
+:type num_of_dpus: int
+:param region_name: aws region name (example: us-east-1)
+:type region_name: str
+:param iam_role_name: AWS IAM Role for Glue Job
+:type iam_role_name: str
+:param s3_bucket: S3 bucket where logs and local etl script will be 
uploaded
+:type s3_bucket: str
+"""
+
+def __init__(self,
+ job_name=None,
+ desc=None,
+ concurrent_run_limit=1,
+ script_location=None,
+ conns=None,
+ retry_limit=0,
+ num_of_dpus=10,
+ aws_conn_id='aws_default',
+ region_name=None,
+ iam_role_name=None,
+ s3_bucket=None, *args, **kwargs):
+self.job_name = job_name
+self.desc = desc
+self.concurrent_run_limit = concurrent_run_limit
+self.script_location = script_location
+self.conns = conns or ["s3"]
+self.retry_limit = retry_limit
+self.num_of_dpus = num_of_dpus
+self.aws_conn_id = aws_conn_id
+self.region_name = region_name
+self.s3_bucket = s3_bucket
+self.role_name = iam_role_name
+self.S3_PROTOCOL = "s3://"
+self.S3_ARTIFACTS_PREFIX = 'artifacts/glue-scripts/'
+self.S3_GLUE_LOGS = 'logs/glue-logs/'
+super(AwsGlueJobHook, self).__init__(*args, **kwargs)
+
+def get_conn(self):
+conn = self.get_client_type('glue', self.region_name)
+return conn
+
+def list_jobs(self):
+conn = self.get_conn()
+return conn.get_jobs()
+
+def get_iam_execution_role(self):
+"""
+:return: iam role for job execution
+"""
+iam_client = self.get_client_type('iam', self.region_name)
+
+try:
+glue_execution_role = iam_client.get_role(RoleName=self.role_name)
+self.log.info("Iam Role Name: %s", self.role_name)
+return glue_execution_role
+except Exception as general_error:
+self.log.error(f'Failed to create aws glue job, error: 
{str(general_error)}')
+raise
+
+def initialize_job(self, script_arguments=None):
+"""
+Initializes connection with AWS Glue
+to run job
+:return:
+"""
+if self.s3_bucket is None:
+raise AirflowException(
+'Could not initialize glue job, '
+'error: Specify Parameter `s3_bucket`'
+)
+
+glue_client = self.get_conn()
+
+try:
+job_response = self.get_or_create_glue_job()
+job_name = job_response
+job_run = glue_client.start_job_run(
+JobName=job_name,
+Arguments=script_arguments
+)
+return self.job_completion(job_name, job_run['JobRunId'])
+except Exception as general_error:
+self.log.error(f'Failed to run aws glue job, error: 
{str(general_error)}')
+  

[GitHub] [airflow] feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration

2020-04-02 Thread GitBox
feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS 
Glue Job Integration
URL: https://github.com/apache/airflow/pull/6007#discussion_r402128945
 
 

 ##
 File path: airflow/providers/amazon/aws/hooks/glue.py
 ##
 @@ -0,0 +1,214 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+import os.path
+import time
+
+
+class AwsGlueJobHook(AwsBaseHook):
+"""
+Interact with AWS Glue - create job, trigger, crawler
+
+:param job_name: unique job name per AWS account
+:type job_name: str
+:param desc: job description
+:type desc: str
+:param concurrent_run_limit: The maximum number of concurrent runs allowed 
for a job
+:type concurrent_run_limit: int
+:param script_location: path to etl script either on s3 or local
+:type script_location: str
+:param conns: A list of connections used by the job
+:type conns: list
+:param retry_limit: Maximum number of times to retry this job if it fails
+:type retry_limit: int
+:param num_of_dpus: Number of AWS Glue DPUs to allocate to this Job
+:type num_of_dpus: int
+:param region_name: aws region name (example: us-east-1)
+:type region_name: str
+:param iam_role_name: AWS IAM Role for Glue Job
+:type iam_role_name: str
+:param s3_bucket: S3 bucket where logs and local etl script will be 
uploaded
+:type s3_bucket: str
+"""
+
+def __init__(self,
+ job_name=None,
+ desc=None,
+ concurrent_run_limit=1,
+ script_location=None,
+ conns=None,
+ retry_limit=0,
+ num_of_dpus=10,
+ aws_conn_id='aws_default',
+ region_name=None,
+ iam_role_name=None,
+ s3_bucket=None, *args, **kwargs):
+self.job_name = job_name
+self.desc = desc
+self.concurrent_run_limit = concurrent_run_limit
+self.script_location = script_location
+self.conns = conns or ["s3"]
+self.retry_limit = retry_limit
+self.num_of_dpus = num_of_dpus
+self.aws_conn_id = aws_conn_id
+self.region_name = region_name
+self.s3_bucket = s3_bucket
+self.role_name = iam_role_name
+self.S3_PROTOCOL = "s3://"
+self.S3_ARTIFACTS_PREFIX = 'artifacts/glue-scripts/'
+self.S3_GLUE_LOGS = 'logs/glue-logs/'
+super(AwsGlueJobHook, self).__init__(*args, **kwargs)
+
+def get_conn(self):
+conn = self.get_client_type('glue', self.region_name)
+return conn
+
+def list_jobs(self):
+conn = self.get_conn()
+return conn.get_jobs()
+
+def get_iam_execution_role(self):
+"""
+:return: iam role for job execution
+"""
+iam_client = self.get_client_type('iam', self.region_name)
+
+try:
+glue_execution_role = iam_client.get_role(RoleName=self.role_name)
+self.log.info("Iam Role Name: %s", self.role_name)
+return glue_execution_role
+except Exception as general_error:
+self.log.error(f'Failed to create aws glue job, error: 
{str(general_error)}')
+raise
+
+def initialize_job(self, script_arguments=None):
+"""
+Initializes connection with AWS Glue
+to run job
+:return:
+"""
+if self.s3_bucket is None:
+raise AirflowException(
+'Could not initialize glue job, '
+'error: Specify Parameter `s3_bucket`'
+)
+
+glue_client = self.get_conn()
+
+try:
+job_response = self.get_or_create_glue_job()
+job_name = job_response
+job_run = glue_client.start_job_run(
+JobName=job_name,
+Arguments=script_arguments
+)
+return self.job_completion(job_name, job_run['JobRunId'])
+except Exception as general_error:
+self.log.error(f'Failed to run aws glue job, error: 
{str(general_error)}')
+  

[GitHub] [airflow] feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration

2020-04-02 Thread GitBox
feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS 
Glue Job Integration
URL: https://github.com/apache/airflow/pull/6007#discussion_r402125287
 
 

 ##
 File path: airflow/providers/amazon/aws/hooks/glue.py
 ##
 @@ -0,0 +1,214 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+import os.path
+import time
+
+
+class AwsGlueJobHook(AwsBaseHook):
+"""
+Interact with AWS Glue - create job, trigger, crawler
+
+:param job_name: unique job name per AWS account
+:type job_name: str
+:param desc: job description
+:type desc: str
+:param concurrent_run_limit: The maximum number of concurrent runs allowed 
for a job
+:type concurrent_run_limit: int
+:param script_location: path to etl script either on s3 or local
+:type script_location: str
+:param conns: A list of connections used by the job
+:type conns: list
+:param retry_limit: Maximum number of times to retry this job if it fails
+:type retry_limit: int
+:param num_of_dpus: Number of AWS Glue DPUs to allocate to this Job
+:type num_of_dpus: int
+:param region_name: aws region name (example: us-east-1)
+:type region_name: str
+:param iam_role_name: AWS IAM Role for Glue Job
+:type iam_role_name: str
+:param s3_bucket: S3 bucket where logs and local etl script will be 
uploaded
+:type s3_bucket: str
+"""
+
+def __init__(self,
+ job_name=None,
+ desc=None,
+ concurrent_run_limit=1,
+ script_location=None,
+ conns=None,
+ retry_limit=0,
+ num_of_dpus=10,
+ aws_conn_id='aws_default',
+ region_name=None,
+ iam_role_name=None,
+ s3_bucket=None, *args, **kwargs):
+self.job_name = job_name
+self.desc = desc
+self.concurrent_run_limit = concurrent_run_limit
+self.script_location = script_location
+self.conns = conns or ["s3"]
+self.retry_limit = retry_limit
+self.num_of_dpus = num_of_dpus
+self.aws_conn_id = aws_conn_id
+self.region_name = region_name
+self.s3_bucket = s3_bucket
+self.role_name = iam_role_name
+self.S3_PROTOCOL = "s3://"
+self.S3_ARTIFACTS_PREFIX = 'artifacts/glue-scripts/'
+self.S3_GLUE_LOGS = 'logs/glue-logs/'
+super(AwsGlueJobHook, self).__init__(*args, **kwargs)
+
+def get_conn(self):
+conn = self.get_client_type('glue', self.region_name)
+return conn
+
+def list_jobs(self):
+conn = self.get_conn()
+return conn.get_jobs()
+
+def get_iam_execution_role(self):
+"""
+:return: iam role for job execution
+"""
+iam_client = self.get_client_type('iam', self.region_name)
+
+try:
+glue_execution_role = iam_client.get_role(RoleName=self.role_name)
+self.log.info("Iam Role Name: %s", self.role_name)
+return glue_execution_role
+except Exception as general_error:
+self.log.error(f'Failed to create aws glue job, error: 
{str(general_error)}')
+raise
+
+def initialize_job(self, script_arguments=None):
+"""
+Initializes connection with AWS Glue
+to run job
+:return:
+"""
+if self.s3_bucket is None:
+raise AirflowException(
+'Could not initialize glue job, '
+'error: Specify Parameter `s3_bucket`'
+)
+
+glue_client = self.get_conn()
+
+try:
+job_response = self.get_or_create_glue_job()
+job_name = job_response
 
 Review comment:
   ```suggestion
   job_name = self.get_or_create_glue_job()
   ```
   What is the point in adding another variable?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to 

[GitHub] [airflow] feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration

2020-04-02 Thread GitBox
feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS 
Glue Job Integration
URL: https://github.com/apache/airflow/pull/6007#discussion_r402153352
 
 

 ##
 File path: tests/providers/amazon/aws/sensors/test_aws_glue_job_sensor.py
 ##
 @@ -0,0 +1,69 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+
+from airflow import configuration
+from airflow.contrib.hooks.aws_glue_job_hook import AwsGlueJobHook
+from airflow.contrib.sensors.aws_glue_job_sensor import AwsGlueJobSensor
 
 Review comment:
   Same here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration

2020-04-02 Thread GitBox
feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS 
Glue Job Integration
URL: https://github.com/apache/airflow/pull/6007#discussion_r402126919
 
 

 ##
 File path: airflow/providers/amazon/aws/hooks/glue.py
 ##
 @@ -0,0 +1,214 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+import os.path
+import time
+
+
+class AwsGlueJobHook(AwsBaseHook):
+"""
+Interact with AWS Glue - create job, trigger, crawler
+
+:param job_name: unique job name per AWS account
+:type job_name: str
+:param desc: job description
+:type desc: str
+:param concurrent_run_limit: The maximum number of concurrent runs allowed 
for a job
+:type concurrent_run_limit: int
+:param script_location: path to etl script either on s3 or local
+:type script_location: str
+:param conns: A list of connections used by the job
+:type conns: list
+:param retry_limit: Maximum number of times to retry this job if it fails
+:type retry_limit: int
+:param num_of_dpus: Number of AWS Glue DPUs to allocate to this Job
+:type num_of_dpus: int
+:param region_name: aws region name (example: us-east-1)
+:type region_name: str
+:param iam_role_name: AWS IAM Role for Glue Job
+:type iam_role_name: str
+:param s3_bucket: S3 bucket where logs and local etl script will be 
uploaded
+:type s3_bucket: str
+"""
+
+def __init__(self,
+ job_name=None,
+ desc=None,
+ concurrent_run_limit=1,
+ script_location=None,
+ conns=None,
+ retry_limit=0,
+ num_of_dpus=10,
+ aws_conn_id='aws_default',
+ region_name=None,
+ iam_role_name=None,
+ s3_bucket=None, *args, **kwargs):
+self.job_name = job_name
+self.desc = desc
+self.concurrent_run_limit = concurrent_run_limit
+self.script_location = script_location
+self.conns = conns or ["s3"]
+self.retry_limit = retry_limit
+self.num_of_dpus = num_of_dpus
+self.aws_conn_id = aws_conn_id
+self.region_name = region_name
+self.s3_bucket = s3_bucket
+self.role_name = iam_role_name
+self.S3_PROTOCOL = "s3://"
+self.S3_ARTIFACTS_PREFIX = 'artifacts/glue-scripts/'
+self.S3_GLUE_LOGS = 'logs/glue-logs/'
+super(AwsGlueJobHook, self).__init__(*args, **kwargs)
+
+def get_conn(self):
+conn = self.get_client_type('glue', self.region_name)
+return conn
+
+def list_jobs(self):
+conn = self.get_conn()
+return conn.get_jobs()
+
+def get_iam_execution_role(self):
+"""
+:return: iam role for job execution
+"""
+iam_client = self.get_client_type('iam', self.region_name)
+
+try:
+glue_execution_role = iam_client.get_role(RoleName=self.role_name)
+self.log.info("Iam Role Name: %s", self.role_name)
+return glue_execution_role
+except Exception as general_error:
+self.log.error(f'Failed to create aws glue job, error: 
{str(general_error)}')
+raise
+
+def initialize_job(self, script_arguments=None):
+"""
+Initializes connection with AWS Glue
+to run job
+:return:
+"""
+if self.s3_bucket is None:
+raise AirflowException(
+'Could not initialize glue job, '
+'error: Specify Parameter `s3_bucket`'
+)
+
+glue_client = self.get_conn()
+
+try:
+job_response = self.get_or_create_glue_job()
+job_name = job_response
+job_run = glue_client.start_job_run(
+JobName=job_name,
+Arguments=script_arguments
+)
+return self.job_completion(job_name, job_run['JobRunId'])
+except Exception as general_error:
+self.log.error(f'Failed to run aws glue job, error: 
{str(general_error)}')
+  

[GitHub] [airflow] feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration

2020-04-02 Thread GitBox
feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS 
Glue Job Integration
URL: https://github.com/apache/airflow/pull/6007#discussion_r402123844
 
 

 ##
 File path: airflow/providers/amazon/aws/hooks/glue.py
 ##
 @@ -0,0 +1,214 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+import os.path
+import time
+
+
+class AwsGlueJobHook(AwsBaseHook):
+"""
+Interact with AWS Glue - create job, trigger, crawler
+
+:param job_name: unique job name per AWS account
+:type job_name: str
+:param desc: job description
+:type desc: str
+:param concurrent_run_limit: The maximum number of concurrent runs allowed 
for a job
+:type concurrent_run_limit: int
+:param script_location: path to etl script either on s3 or local
+:type script_location: str
+:param conns: A list of connections used by the job
+:type conns: list
+:param retry_limit: Maximum number of times to retry this job if it fails
+:type retry_limit: int
+:param num_of_dpus: Number of AWS Glue DPUs to allocate to this Job
+:type num_of_dpus: int
+:param region_name: aws region name (example: us-east-1)
+:type region_name: str
+:param iam_role_name: AWS IAM Role for Glue Job
+:type iam_role_name: str
+:param s3_bucket: S3 bucket where logs and local etl script will be 
uploaded
+:type s3_bucket: str
+"""
+
+def __init__(self,
+ job_name=None,
+ desc=None,
+ concurrent_run_limit=1,
+ script_location=None,
+ conns=None,
+ retry_limit=0,
+ num_of_dpus=10,
+ aws_conn_id='aws_default',
+ region_name=None,
+ iam_role_name=None,
+ s3_bucket=None, *args, **kwargs):
+self.job_name = job_name
+self.desc = desc
+self.concurrent_run_limit = concurrent_run_limit
+self.script_location = script_location
+self.conns = conns or ["s3"]
+self.retry_limit = retry_limit
+self.num_of_dpus = num_of_dpus
+self.aws_conn_id = aws_conn_id
+self.region_name = region_name
+self.s3_bucket = s3_bucket
+self.role_name = iam_role_name
+self.S3_PROTOCOL = "s3://"
+self.S3_ARTIFACTS_PREFIX = 'artifacts/glue-scripts/'
+self.S3_GLUE_LOGS = 'logs/glue-logs/'
+super(AwsGlueJobHook, self).__init__(*args, **kwargs)
+
+def get_conn(self):
+conn = self.get_client_type('glue', self.region_name)
+return conn
+
+def list_jobs(self):
+conn = self.get_conn()
+return conn.get_jobs()
+
+def get_iam_execution_role(self):
+"""
+:return: iam role for job execution
+"""
+iam_client = self.get_client_type('iam', self.region_name)
+
+try:
+glue_execution_role = iam_client.get_role(RoleName=self.role_name)
+self.log.info("Iam Role Name: %s", self.role_name)
+return glue_execution_role
+except Exception as general_error:
+self.log.error(f'Failed to create aws glue job, error: 
{str(general_error)}')
+raise
+
+def initialize_job(self, script_arguments=None):
+"""
+Initializes connection with AWS Glue
+to run job
+:return:
+"""
+if self.s3_bucket is None:
+raise AirflowException(
+'Could not initialize glue job, '
+'error: Specify Parameter `s3_bucket`'
+)
 
 Review comment:
   If `self.s3_bucket` is required you should not declare it as 
`s3_bucket=None` (=kwarg). Instead just use arg `s3_bucket` without default 
value.
   
   Optional parameters (kwargs) should only be used if they are really optional!


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this 

[GitHub] [airflow] feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration

2020-04-02 Thread GitBox
feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS 
Glue Job Integration
URL: https://github.com/apache/airflow/pull/6007#discussion_r402152656
 
 

 ##
 File path: tests/providers/amazon/aws/operators/test_aws_glue_job_operator.py
 ##
 @@ -0,0 +1,57 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+
+from airflow import configuration
+from airflow.contrib.hooks.aws_glue_job_hook import AwsGlueJobHook
+from airflow.contrib.operators.aws_glue_job_operator import AWSGlueJobOperator
 
 Review comment:
   Please use the `providers` import.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration

2020-04-02 Thread GitBox
feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS 
Glue Job Integration
URL: https://github.com/apache/airflow/pull/6007#discussion_r402146498
 
 

 ##
 File path: airflow/providers/amazon/aws/operators/glue.py
 ##
 @@ -0,0 +1,110 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import unicode_literals
+
+from airflow.providers.amazon.aws.hooks.glue import AwsGlueJobHook
+from airflow.models import BaseOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class AWSGlueJobOperator(BaseOperator):
+"""
+Creates an AWS Glue Job. AWS Glue is a serverless Spark
+ETL service for running Spark Jobs on the AWS cloud.
+Language support: Python and Scala
+
+:param job_name: unique job name per AWS Account
+:type str
+:param script_location: location of ETL script. Must be a local or S3 path
+:type str
+:param job_desc: job description details
+:type str
+:param concurrent_run_limit: The maximum number of concurrent runs allowed 
for a job
+:type int
+:param script_args: etl script arguments and AWS Glue arguments
+:type dict
+:param connections: AWS Glue connections to be used by the job.
+:type list
+:param retry_limit: The maximum number of times to retry this job if it 
fails
+:type int
+:param num_of_dpus: Number of AWS Glue DPUs to allocate to this Job.
+:type int
+:param region_name: aws region name (example: us-east-1)
+:type region_name: str
+:param s3_bucket: S3 bucket where logs and local etl script will be 
uploaded
+:type str
+:param iam_role_name: AWS IAM Role for Glue Job Execution
+:type str
+"""
+template_fields = ()
+template_ext = ()
+ui_color = '#ededed'
+
+@apply_defaults
+def __init__(self,
+ job_name='aws_glue_default_job',
+ job_desc='AWS Glue Job with Airflow',
+ script_location=None,
+ concurrent_run_limit=None,
+ script_args={},
+ connections=[],
 
 Review comment:
   `{}` and `[]` are mutable values.
   
   So please use `script_args=None` and then in the `__init__` you can do 
`self.script_args = script_args or {}`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration

2020-04-02 Thread GitBox
feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS 
Glue Job Integration
URL: https://github.com/apache/airflow/pull/6007#discussion_r402139013
 
 

 ##
 File path: airflow/providers/amazon/aws/hooks/glue.py
 ##
 @@ -0,0 +1,214 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+import os.path
+import time
+
+
+class AwsGlueJobHook(AwsBaseHook):
+"""
+Interact with AWS Glue - create job, trigger, crawler
+
+:param job_name: unique job name per AWS account
+:type job_name: str
+:param desc: job description
+:type desc: str
+:param concurrent_run_limit: The maximum number of concurrent runs allowed 
for a job
+:type concurrent_run_limit: int
+:param script_location: path to etl script either on s3 or local
+:type script_location: str
+:param conns: A list of connections used by the job
+:type conns: list
+:param retry_limit: Maximum number of times to retry this job if it fails
+:type retry_limit: int
+:param num_of_dpus: Number of AWS Glue DPUs to allocate to this Job
+:type num_of_dpus: int
+:param region_name: aws region name (example: us-east-1)
+:type region_name: str
+:param iam_role_name: AWS IAM Role for Glue Job
+:type iam_role_name: str
+:param s3_bucket: S3 bucket where logs and local etl script will be 
uploaded
+:type s3_bucket: str
+"""
+
+def __init__(self,
+ job_name=None,
+ desc=None,
+ concurrent_run_limit=1,
+ script_location=None,
+ conns=None,
+ retry_limit=0,
+ num_of_dpus=10,
+ aws_conn_id='aws_default',
+ region_name=None,
+ iam_role_name=None,
+ s3_bucket=None, *args, **kwargs):
+self.job_name = job_name
+self.desc = desc
+self.concurrent_run_limit = concurrent_run_limit
+self.script_location = script_location
+self.conns = conns or ["s3"]
+self.retry_limit = retry_limit
+self.num_of_dpus = num_of_dpus
+self.aws_conn_id = aws_conn_id
+self.region_name = region_name
+self.s3_bucket = s3_bucket
+self.role_name = iam_role_name
+self.S3_PROTOCOL = "s3://"
+self.S3_ARTIFACTS_PREFIX = 'artifacts/glue-scripts/'
+self.S3_GLUE_LOGS = 'logs/glue-logs/'
+super(AwsGlueJobHook, self).__init__(*args, **kwargs)
+
+def get_conn(self):
+conn = self.get_client_type('glue', self.region_name)
+return conn
+
+def list_jobs(self):
+conn = self.get_conn()
+return conn.get_jobs()
+
+def get_iam_execution_role(self):
+"""
+:return: iam role for job execution
+"""
+iam_client = self.get_client_type('iam', self.region_name)
+
+try:
+glue_execution_role = iam_client.get_role(RoleName=self.role_name)
+self.log.info("Iam Role Name: %s", self.role_name)
+return glue_execution_role
+except Exception as general_error:
+self.log.error(f'Failed to create aws glue job, error: 
{str(general_error)}')
+raise
 
 Review comment:
   Can you check if you can reuse some code of `AwsBaseHook`? If you search for 
`iam` you can see that there is iam auth already implemented maybe you can use 
this. WDYT?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration

2020-04-02 Thread GitBox
feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS 
Glue Job Integration
URL: https://github.com/apache/airflow/pull/6007#discussion_r402121660
 
 

 ##
 File path: airflow/providers/amazon/aws/hooks/glue.py
 ##
 @@ -0,0 +1,214 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+import os.path
+import time
+
+
+class AwsGlueJobHook(AwsBaseHook):
+"""
+Interact with AWS Glue - create job, trigger, crawler
+
+:param job_name: unique job name per AWS account
+:type job_name: str
 
 Review comment:
   ```suggestion
   :type job_name: Optional[str]
   ```
   None-able strings are `Optional[str]`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration

2020-04-02 Thread GitBox
feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS 
Glue Job Integration
URL: https://github.com/apache/airflow/pull/6007#discussion_r402150830
 
 

 ##
 File path: airflow/providers/amazon/aws/sensors/glue.py
 ##
 @@ -0,0 +1,57 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+from airflow.providers.amazon.aws.hooks.glue import AwsGlueJobHook
+
+
+class AwsGlueJobSensor(BaseSensorOperator):
+"""
+Waits for an AWS Glue Job to reach any of the status below
+'FAILED', 'STOPPED', 'SUCCEEDED'
+
+:param job_name: The AWS Glue Job unique name
+:type str
+:param run_id: The AWS Glue current running job identifier
+:type str
+"""
+template_fields = ('job_name', 'run_id')
+
+@apply_defaults
+def __init__(self,
+ job_name,
+ run_id,
+ aws_conn_id='aws_default',
+ *args,
+ **kwargs):
+super(AwsGlueJobSensor, self).__init__(*args, **kwargs)
+self.job_name = job_name
+self.run_id = run_id
+self.aws_conn_id = aws_conn_id
+self.targeted_status = ['FAILED', 'STOPPED', 'SUCCEEDED']
+
+def poke(self, context):
+self.log.info("Poking for job run status : {self.targeted_status}\n"
+  "for Glue Job {self.job_name} and ID {self.run_id}"
+  .format(**locals()))
 
 Review comment:
   Please use `%s` syntax.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration

2020-04-02 Thread GitBox
feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS 
Glue Job Integration
URL: https://github.com/apache/airflow/pull/6007#discussion_r402120708
 
 

 ##
 File path: airflow/providers/amazon/aws/hooks/glue.py
 ##
 @@ -0,0 +1,214 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+import os.path
+import time
+
+
+class AwsGlueJobHook(AwsBaseHook):
+"""
+Interact with AWS Glue - create job, trigger, crawler
+
+:param job_name: unique job name per AWS account
+:type job_name: str
+:param desc: job description
+:type desc: str
+:param concurrent_run_limit: The maximum number of concurrent runs allowed 
for a job
+:type concurrent_run_limit: int
+:param script_location: path to etl script either on s3 or local
+:type script_location: str
+:param conns: A list of connections used by the job
+:type conns: list
+:param retry_limit: Maximum number of times to retry this job if it fails
+:type retry_limit: int
+:param num_of_dpus: Number of AWS Glue DPUs to allocate to this Job
+:type num_of_dpus: int
+:param region_name: aws region name (example: us-east-1)
+:type region_name: str
+:param iam_role_name: AWS IAM Role for Glue Job
+:type iam_role_name: str
+:param s3_bucket: S3 bucket where logs and local etl script will be 
uploaded
+:type s3_bucket: str
+"""
+
+def __init__(self,
+ job_name=None,
+ desc=None,
+ concurrent_run_limit=1,
+ script_location=None,
+ conns=None,
+ retry_limit=0,
+ num_of_dpus=10,
+ aws_conn_id='aws_default',
+ region_name=None,
+ iam_role_name=None,
+ s3_bucket=None, *args, **kwargs):
+self.job_name = job_name
+self.desc = desc
+self.concurrent_run_limit = concurrent_run_limit
+self.script_location = script_location
+self.conns = conns or ["s3"]
+self.retry_limit = retry_limit
+self.num_of_dpus = num_of_dpus
+self.aws_conn_id = aws_conn_id
 
 Review comment:
   You can still access it because of the `AwsBaseHook` implementation.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration

2020-04-02 Thread GitBox
feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS 
Glue Job Integration
URL: https://github.com/apache/airflow/pull/6007#discussion_r402142007
 
 

 ##
 File path: airflow/providers/amazon/aws/hooks/glue.py
 ##
 @@ -0,0 +1,214 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+import os.path
+import time
+
+
+class AwsGlueJobHook(AwsBaseHook):
+"""
+Interact with AWS Glue - create job, trigger, crawler
+
+:param job_name: unique job name per AWS account
+:type job_name: str
+:param desc: job description
+:type desc: str
+:param concurrent_run_limit: The maximum number of concurrent runs allowed 
for a job
+:type concurrent_run_limit: int
+:param script_location: path to etl script either on s3 or local
+:type script_location: str
+:param conns: A list of connections used by the job
+:type conns: list
+:param retry_limit: Maximum number of times to retry this job if it fails
+:type retry_limit: int
+:param num_of_dpus: Number of AWS Glue DPUs to allocate to this Job
+:type num_of_dpus: int
+:param region_name: aws region name (example: us-east-1)
+:type region_name: str
+:param iam_role_name: AWS IAM Role for Glue Job
+:type iam_role_name: str
+:param s3_bucket: S3 bucket where logs and local etl script will be 
uploaded
+:type s3_bucket: str
+"""
+
+def __init__(self,
+ job_name=None,
+ desc=None,
+ concurrent_run_limit=1,
+ script_location=None,
+ conns=None,
+ retry_limit=0,
+ num_of_dpus=10,
+ aws_conn_id='aws_default',
+ region_name=None,
+ iam_role_name=None,
+ s3_bucket=None, *args, **kwargs):
+self.job_name = job_name
+self.desc = desc
+self.concurrent_run_limit = concurrent_run_limit
+self.script_location = script_location
+self.conns = conns or ["s3"]
+self.retry_limit = retry_limit
+self.num_of_dpus = num_of_dpus
+self.aws_conn_id = aws_conn_id
+self.region_name = region_name
+self.s3_bucket = s3_bucket
+self.role_name = iam_role_name
+self.S3_PROTOCOL = "s3://"
+self.S3_ARTIFACTS_PREFIX = 'artifacts/glue-scripts/'
+self.S3_GLUE_LOGS = 'logs/glue-logs/'
+super(AwsGlueJobHook, self).__init__(*args, **kwargs)
+
+def get_conn(self):
+conn = self.get_client_type('glue', self.region_name)
+return conn
+
+def list_jobs(self):
+conn = self.get_conn()
+return conn.get_jobs()
+
+def get_iam_execution_role(self):
+"""
+:return: iam role for job execution
+"""
+iam_client = self.get_client_type('iam', self.region_name)
+
+try:
+glue_execution_role = iam_client.get_role(RoleName=self.role_name)
+self.log.info("Iam Role Name: %s", self.role_name)
+return glue_execution_role
+except Exception as general_error:
+self.log.error(f'Failed to create aws glue job, error: 
{str(general_error)}')
+raise
+
+def initialize_job(self, script_arguments=None):
+"""
+Initializes connection with AWS Glue
+to run job
+:return:
+"""
+if self.s3_bucket is None:
+raise AirflowException(
+'Could not initialize glue job, '
+'error: Specify Parameter `s3_bucket`'
+)
+
+glue_client = self.get_conn()
+
+try:
+job_response = self.get_or_create_glue_job()
+job_name = job_response
+job_run = glue_client.start_job_run(
+JobName=job_name,
+Arguments=script_arguments
+)
+return self.job_completion(job_name, job_run['JobRunId'])
+except Exception as general_error:
+self.log.error(f'Failed to run aws glue job, error: 
{str(general_error)}')
+  

[GitHub] [airflow] feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration

2020-04-02 Thread GitBox
feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS 
Glue Job Integration
URL: https://github.com/apache/airflow/pull/6007#discussion_r402120252
 
 

 ##
 File path: airflow/providers/amazon/aws/hooks/glue.py
 ##
 @@ -0,0 +1,214 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+import os.path
+import time
+
+
+class AwsGlueJobHook(AwsBaseHook):
+"""
+Interact with AWS Glue - create job, trigger, crawler
+
+:param job_name: unique job name per AWS account
+:type job_name: str
+:param desc: job description
+:type desc: str
+:param concurrent_run_limit: The maximum number of concurrent runs allowed 
for a job
+:type concurrent_run_limit: int
+:param script_location: path to etl script either on s3 or local
+:type script_location: str
+:param conns: A list of connections used by the job
+:type conns: list
+:param retry_limit: Maximum number of times to retry this job if it fails
+:type retry_limit: int
+:param num_of_dpus: Number of AWS Glue DPUs to allocate to this Job
+:type num_of_dpus: int
+:param region_name: aws region name (example: us-east-1)
+:type region_name: str
+:param iam_role_name: AWS IAM Role for Glue Job
+:type iam_role_name: str
+:param s3_bucket: S3 bucket where logs and local etl script will be 
uploaded
+:type s3_bucket: str
+"""
+
+def __init__(self,
+ job_name=None,
+ desc=None,
+ concurrent_run_limit=1,
+ script_location=None,
+ conns=None,
+ retry_limit=0,
+ num_of_dpus=10,
+ aws_conn_id='aws_default',
+ region_name=None,
+ iam_role_name=None,
+ s3_bucket=None, *args, **kwargs):
+self.job_name = job_name
+self.desc = desc
+self.concurrent_run_limit = concurrent_run_limit
+self.script_location = script_location
+self.conns = conns or ["s3"]
+self.retry_limit = retry_limit
+self.num_of_dpus = num_of_dpus
+self.aws_conn_id = aws_conn_id
 
 Review comment:
   ```suggestion
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration

2020-04-02 Thread GitBox
feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS 
Glue Job Integration
URL: https://github.com/apache/airflow/pull/6007#discussion_r402152456
 
 

 ##
 File path: tests/providers/amazon/aws/hooks/test_aws_glue_job_hook.py
 ##
 @@ -0,0 +1,115 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# http://www.apache.org/licenses/LICENSE-2.0
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import unittest
+import json
+
+
+try:
+from unittest import mock
+except ImportError:
+try:
+import mock
+except ImportError:
+mock = None
+
+try:
+from moto import mock_iam
+except ImportError:
+mock_iam = None
+
+
+from airflow.contrib.hooks.aws_glue_job_hook import AwsGlueJobHook
 
 Review comment:
   Please use the `providers` import.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration

2020-04-02 Thread GitBox
feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS 
Glue Job Integration
URL: https://github.com/apache/airflow/pull/6007#discussion_r402130497
 
 

 ##
 File path: airflow/providers/amazon/aws/hooks/glue.py
 ##
 @@ -0,0 +1,214 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+import os.path
+import time
+
+
+class AwsGlueJobHook(AwsBaseHook):
+"""
+Interact with AWS Glue - create job, trigger, crawler
+
+:param job_name: unique job name per AWS account
+:type job_name: str
+:param desc: job description
+:type desc: str
+:param concurrent_run_limit: The maximum number of concurrent runs allowed 
for a job
+:type concurrent_run_limit: int
+:param script_location: path to etl script either on s3 or local
+:type script_location: str
+:param conns: A list of connections used by the job
+:type conns: list
+:param retry_limit: Maximum number of times to retry this job if it fails
+:type retry_limit: int
+:param num_of_dpus: Number of AWS Glue DPUs to allocate to this Job
+:type num_of_dpus: int
+:param region_name: aws region name (example: us-east-1)
+:type region_name: str
+:param iam_role_name: AWS IAM Role for Glue Job
+:type iam_role_name: str
+:param s3_bucket: S3 bucket where logs and local etl script will be 
uploaded
+:type s3_bucket: str
+"""
+
+def __init__(self,
+ job_name=None,
+ desc=None,
+ concurrent_run_limit=1,
+ script_location=None,
+ conns=None,
+ retry_limit=0,
+ num_of_dpus=10,
+ aws_conn_id='aws_default',
+ region_name=None,
+ iam_role_name=None,
+ s3_bucket=None, *args, **kwargs):
+self.job_name = job_name
+self.desc = desc
+self.concurrent_run_limit = concurrent_run_limit
+self.script_location = script_location
+self.conns = conns or ["s3"]
+self.retry_limit = retry_limit
+self.num_of_dpus = num_of_dpus
+self.aws_conn_id = aws_conn_id
+self.region_name = region_name
+self.s3_bucket = s3_bucket
+self.role_name = iam_role_name
+self.S3_PROTOCOL = "s3://"
+self.S3_ARTIFACTS_PREFIX = 'artifacts/glue-scripts/'
+self.S3_GLUE_LOGS = 'logs/glue-logs/'
+super(AwsGlueJobHook, self).__init__(*args, **kwargs)
+
+def get_conn(self):
+conn = self.get_client_type('glue', self.region_name)
+return conn
+
+def list_jobs(self):
+conn = self.get_conn()
+return conn.get_jobs()
+
+def get_iam_execution_role(self):
+"""
+:return: iam role for job execution
+"""
+iam_client = self.get_client_type('iam', self.region_name)
+
+try:
+glue_execution_role = iam_client.get_role(RoleName=self.role_name)
+self.log.info("Iam Role Name: %s", self.role_name)
+return glue_execution_role
+except Exception as general_error:
+self.log.error(f'Failed to create aws glue job, error: 
{str(general_error)}')
+raise
+
+def initialize_job(self, script_arguments=None):
+"""
+Initializes connection with AWS Glue
+to run job
+:return:
+"""
+if self.s3_bucket is None:
+raise AirflowException(
+'Could not initialize glue job, '
+'error: Specify Parameter `s3_bucket`'
+)
+
+glue_client = self.get_conn()
+
+try:
+job_response = self.get_or_create_glue_job()
+job_name = job_response
+job_run = glue_client.start_job_run(
+JobName=job_name,
+Arguments=script_arguments
+)
+return self.job_completion(job_name, job_run['JobRunId'])
+except Exception as general_error:
+self.log.error(f'Failed to run aws glue job, error: 
{str(general_error)}')
+  

[GitHub] [airflow] feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration

2020-02-28 Thread GitBox
feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS 
Glue Job Integration
URL: https://github.com/apache/airflow/pull/6007#discussion_r385614534
 
 

 ##
 File path: airflow/providers/amazon/aws/hooks/glue.py
 ##
 @@ -0,0 +1,214 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.exceptions import AirflowException
+from airflow.contrib.hooks.aws_hook import AwsHook
+import os.path
+import time
+
+
+class AwsGlueJobHook(AwsHook):
+"""
+Interact with AWS Glue - create job, trigger, crawler
+
+:param job_name: unique job name per AWS account
+:type str
+:param desc: job description
+:type str
+:param concurrent_run_limit: The maximum number of concurrent runs allowed 
for a job
+:type int
+:param script_location: path to etl script either on s3 or local
+:type str
+:param conns: A list of connections used by the job
+:type list
+:param retry_limit: Maximum number of times to retry this job if it fails
+:type int
+:param num_of_dpus: Number of AWS Glue DPUs to allocate to this Job
+:type int
+:param region_name: aws region name (example: us-east-1)
+:type region_name: str
+:param s3_bucket: S3 bucket where logs and local etl script will be 
uploaded
+:type str
+:param iam_role_name: AWS IAM Role for Glue Job
+:type str
+"""
+
+def __init__(self,
+ job_name=None,
+ desc=None,
+ concurrent_run_limit=None,
+ script_location=None,
+ conns=None,
+ retry_limit=None,
+ num_of_dpus=None,
+ aws_conn_id='aws_default',
+ region_name=None,
+ iam_role_name=None,
+ s3_bucket=None, *args, **kwargs):
+self.job_name = job_name
+self.desc = desc
+self.concurrent_run_limit = concurrent_run_limit or 1
+self.script_location = script_location
+self.conns = conns or ["s3"]
 
 Review comment:
   In cases of lists it is correct to do `var or []`  


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration

2020-02-28 Thread GitBox
feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS 
Glue Job Integration
URL: https://github.com/apache/airflow/pull/6007#discussion_r385616395
 
 

 ##
 File path: airflow/providers/amazon/aws/hooks/glue.py
 ##
 @@ -0,0 +1,214 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.exceptions import AirflowException
+from airflow.contrib.hooks.aws_hook import AwsHook
+import os.path
+import time
+
+
+class AwsGlueJobHook(AwsHook):
+"""
+Interact with AWS Glue - create job, trigger, crawler
+
+:param job_name: unique job name per AWS account
+:type str
+:param desc: job description
+:type str
+:param concurrent_run_limit: The maximum number of concurrent runs allowed 
for a job
+:type int
+:param script_location: path to etl script either on s3 or local
+:type str
+:param conns: A list of connections used by the job
+:type list
+:param retry_limit: Maximum number of times to retry this job if it fails
+:type int
+:param num_of_dpus: Number of AWS Glue DPUs to allocate to this Job
+:type int
+:param region_name: aws region name (example: us-east-1)
+:type region_name: str
+:param s3_bucket: S3 bucket where logs and local etl script will be 
uploaded
+:type str
+:param iam_role_name: AWS IAM Role for Glue Job
+:type str
+"""
+
+def __init__(self,
+ job_name=None,
+ desc=None,
+ concurrent_run_limit=None,
+ script_location=None,
+ conns=None,
+ retry_limit=None,
+ num_of_dpus=None,
+ aws_conn_id='aws_default',
+ region_name=None,
+ iam_role_name=None,
+ s3_bucket=None, *args, **kwargs):
+self.job_name = job_name
+self.desc = desc
+self.concurrent_run_limit = concurrent_run_limit or 1
+self.script_location = script_location
+self.conns = conns or ["s3"]
+self.retry_limit = retry_limit or 0
+self.num_of_dpus = num_of_dpus or 10
+self.aws_conn_id = aws_conn_id
+self.region_name = region_name
+self.s3_bucket = s3_bucket
+self.role_name = iam_role_name
+self.S3_PROTOCOL = "s3://"
+self.S3_ARTIFACTS_PREFIX = 'artifacts/glue-scripts/'
+self.S3_GLUE_LOGS = 'logs/glue-logs/'
+super(AwsGlueJobHook, self).__init__(*args, **kwargs)
+
+def get_conn(self):
+conn = self.get_client_type('glue', self.region_name)
+return conn
+
+def list_jobs(self):
+conn = self.get_conn()
+return conn.get_jobs()
+
+def get_iam_execution_role(self):
+"""
+:return: iam role for job execution
+"""
+iam_client = self.get_client_type('iam', self.region_name)
+
+try:
+glue_execution_role = iam_client.get_role(RoleName=self.role_name)
+self.log.info("Iam Role Name: {}".format(self.role_name))
 
 Review comment:
   For log statements you can use this:
   ```suggestion
   self.log.info("Iam Role Name: %s", self.role_name)
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration

2020-02-28 Thread GitBox
feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS 
Glue Job Integration
URL: https://github.com/apache/airflow/pull/6007#discussion_r385612150
 
 

 ##
 File path: airflow/providers/amazon/aws/hooks/glue.py
 ##
 @@ -0,0 +1,214 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.exceptions import AirflowException
+from airflow.contrib.hooks.aws_hook import AwsHook
+import os.path
+import time
+
+
+class AwsGlueJobHook(AwsHook):
+"""
+Interact with AWS Glue - create job, trigger, crawler
+
+:param job_name: unique job name per AWS account
+:type str
 
 Review comment:
   ```suggestion
   :type job_name: str
   ```
   
   Please add the variable name to the type definition.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration

2020-02-28 Thread GitBox
feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS 
Glue Job Integration
URL: https://github.com/apache/airflow/pull/6007#discussion_r385612263
 
 

 ##
 File path: airflow/providers/amazon/aws/hooks/glue.py
 ##
 @@ -0,0 +1,214 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.exceptions import AirflowException
+from airflow.contrib.hooks.aws_hook import AwsHook
+import os.path
+import time
+
+
+class AwsGlueJobHook(AwsHook):
+"""
+Interact with AWS Glue - create job, trigger, crawler
+
+:param job_name: unique job name per AWS account
+:type str
 
 Review comment:
   There are more of it below.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration

2020-02-28 Thread GitBox
feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS 
Glue Job Integration
URL: https://github.com/apache/airflow/pull/6007#discussion_r385613741
 
 

 ##
 File path: airflow/providers/amazon/aws/hooks/glue.py
 ##
 @@ -0,0 +1,214 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.exceptions import AirflowException
+from airflow.contrib.hooks.aws_hook import AwsHook
+import os.path
+import time
+
+
+class AwsGlueJobHook(AwsHook):
+"""
+Interact with AWS Glue - create job, trigger, crawler
+
+:param job_name: unique job name per AWS account
+:type str
+:param desc: job description
+:type str
+:param concurrent_run_limit: The maximum number of concurrent runs allowed 
for a job
+:type int
+:param script_location: path to etl script either on s3 or local
+:type str
+:param conns: A list of connections used by the job
+:type list
+:param retry_limit: Maximum number of times to retry this job if it fails
+:type int
+:param num_of_dpus: Number of AWS Glue DPUs to allocate to this Job
+:type int
+:param region_name: aws region name (example: us-east-1)
+:type region_name: str
+:param s3_bucket: S3 bucket where logs and local etl script will be 
uploaded
+:type str
+:param iam_role_name: AWS IAM Role for Glue Job
+:type str
+"""
+
+def __init__(self,
+ job_name=None,
+ desc=None,
+ concurrent_run_limit=None,
 
 Review comment:
   ```suggestion
concurrent_run_limit=1,
   ```
   You can set the default here. This way you don't need to do sth like 
`concurrent_run_limit or 1`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration

2020-02-28 Thread GitBox
feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS 
Glue Job Integration
URL: https://github.com/apache/airflow/pull/6007#discussion_r385625340
 
 

 ##
 File path: airflow/providers/amazon/aws/operators/glue.py
 ##
 @@ -0,0 +1,115 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import unicode_literals
+
+from airflow.contrib.hooks.aws_glue_job_hook import AwsGlueJobHook
+from airflow.models import BaseOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class AWSGlueJobOperator(BaseOperator):
+"""
+Creates an AWS Glue Job. AWS Glue is a serverless Spark
+ETL service for running Spark Jobs on the AWS cloud.
+Language support: Python and Scala
+:param job_name: unique job name per AWS Account
+:type str
+:param script_location: location of ETL script. Must be a local or S3 path
+:type str
+:param job_desc: job description details
+:type str
+:param concurrent_run_limit: The maximum number of concurrent runs allowed 
for a job
+:type int
+:param script_args: etl script arguments and AWS Glue arguments
+:type dict
+:param connections: AWS Glue connections to be used by the job.
+:type list
+:param retry_limit: The maximum number of times to retry this job if it 
fails
+:type int
+:param num_of_dpus: Number of AWS Glue DPUs to allocate to this Job.
+:type int
+:param region_name: aws region name (example: us-east-1)
+:type region_name: str
+:param s3_bucket: S3 bucket where logs and local etl script will be 
uploaded
+:type str
+:param iam_role_name: AWS IAM Role for Glue Job Execution
+:type str
+"""
+template_fields = ()
+template_ext = ()
+ui_color = '#ededed'
+
+@apply_defaults
+def __init__(self,
+ job_name='aws_glue_default_job',
+ job_desc='AWS Glue Job with Airflow',
+ script_location=None,
+ concurrent_run_limit=None,
+ script_args={},
+ connections=[],
+ retry_limit=None,
+ num_of_dpus=6,
+ aws_conn_id='aws_default',
+ region_name=None,
+ s3_bucket=None,
+ iam_role_name=None,
+ *args, **kwargs
+ ):
+super(AWSGlueJobOperator, self).__init__(*args, **kwargs)
+self.job_name = job_name
+self.job_desc = job_desc
+self.script_location = script_location
+self.concurrent_run_limit = concurrent_run_limit
+self.script_args = script_args
+self.connections = connections
+self.retry_limit = retry_limit
+self.num_of_dpus = num_of_dpus
+self.aws_conn_id = aws_conn_id,
+self.region_name = region_name
+self.s3_bucket = s3_bucket
+self.iam_role_name = iam_role_name
+
+def execute(self, context):
+"""
+Executes AWS Glue Job from Airflow
+:return:
+"""
+task_instance = context['ti']
+glue_job = AwsGlueJobHook(job_name=self.job_name,
+  desc=self.job_desc,
+  
concurrent_run_limit=self.concurrent_run_limit,
+  script_location=self.script_location,
+  conns=self.connections,
+  retry_limit=self.retry_limit,
+  num_of_dpus=self.num_of_dpus,
+  aws_conn_id=self.aws_conn_id,
+  region_name=self.region_name,
+  s3_bucket=self.s3_bucket,
+  iam_role_name=self.iam_role_name)
+
+self.log.info("Initializing AWS Glue Job: {}".format(self.job_name))
 
 Review comment:
   You can use the `%s` log formatting ;)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, 

[GitHub] [airflow] feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration

2020-02-28 Thread GitBox
feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS 
Glue Job Integration
URL: https://github.com/apache/airflow/pull/6007#discussion_r385616988
 
 

 ##
 File path: airflow/providers/amazon/aws/hooks/glue.py
 ##
 @@ -0,0 +1,214 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.exceptions import AirflowException
+from airflow.contrib.hooks.aws_hook import AwsHook
+import os.path
+import time
+
+
+class AwsGlueJobHook(AwsHook):
+"""
+Interact with AWS Glue - create job, trigger, crawler
+
+:param job_name: unique job name per AWS account
+:type str
+:param desc: job description
+:type str
+:param concurrent_run_limit: The maximum number of concurrent runs allowed 
for a job
+:type int
+:param script_location: path to etl script either on s3 or local
+:type str
+:param conns: A list of connections used by the job
+:type list
+:param retry_limit: Maximum number of times to retry this job if it fails
+:type int
+:param num_of_dpus: Number of AWS Glue DPUs to allocate to this Job
+:type int
+:param region_name: aws region name (example: us-east-1)
+:type region_name: str
+:param s3_bucket: S3 bucket where logs and local etl script will be 
uploaded
+:type str
+:param iam_role_name: AWS IAM Role for Glue Job
+:type str
+"""
+
+def __init__(self,
+ job_name=None,
+ desc=None,
+ concurrent_run_limit=None,
+ script_location=None,
+ conns=None,
+ retry_limit=None,
+ num_of_dpus=None,
+ aws_conn_id='aws_default',
+ region_name=None,
+ iam_role_name=None,
+ s3_bucket=None, *args, **kwargs):
+self.job_name = job_name
+self.desc = desc
+self.concurrent_run_limit = concurrent_run_limit or 1
+self.script_location = script_location
+self.conns = conns or ["s3"]
+self.retry_limit = retry_limit or 0
+self.num_of_dpus = num_of_dpus or 10
+self.aws_conn_id = aws_conn_id
+self.region_name = region_name
+self.s3_bucket = s3_bucket
+self.role_name = iam_role_name
+self.S3_PROTOCOL = "s3://"
+self.S3_ARTIFACTS_PREFIX = 'artifacts/glue-scripts/'
+self.S3_GLUE_LOGS = 'logs/glue-logs/'
+super(AwsGlueJobHook, self).__init__(*args, **kwargs)
+
+def get_conn(self):
+conn = self.get_client_type('glue', self.region_name)
+return conn
+
+def list_jobs(self):
+conn = self.get_conn()
+return conn.get_jobs()
+
+def get_iam_execution_role(self):
+"""
+:return: iam role for job execution
+"""
+iam_client = self.get_client_type('iam', self.region_name)
+
+try:
+glue_execution_role = iam_client.get_role(RoleName=self.role_name)
+self.log.info("Iam Role Name: {}".format(self.role_name))
+return glue_execution_role
+except Exception as general_error:
+raise AirflowException(
+'Failed to create aws glue job, error: {error}'.format(
+error=str(general_error)
+)
 
 Review comment:
   And for general string formatting you can now use f-strings:
   ```suggestion
   f'Failed to create aws glue job, error: {str(general_error)}'
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration

2020-02-28 Thread GitBox
feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS 
Glue Job Integration
URL: https://github.com/apache/airflow/pull/6007#discussion_r385614534
 
 

 ##
 File path: airflow/providers/amazon/aws/hooks/glue.py
 ##
 @@ -0,0 +1,214 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.exceptions import AirflowException
+from airflow.contrib.hooks.aws_hook import AwsHook
+import os.path
+import time
+
+
+class AwsGlueJobHook(AwsHook):
+"""
+Interact with AWS Glue - create job, trigger, crawler
+
+:param job_name: unique job name per AWS account
+:type str
+:param desc: job description
+:type str
+:param concurrent_run_limit: The maximum number of concurrent runs allowed 
for a job
+:type int
+:param script_location: path to etl script either on s3 or local
+:type str
+:param conns: A list of connections used by the job
+:type list
+:param retry_limit: Maximum number of times to retry this job if it fails
+:type int
+:param num_of_dpus: Number of AWS Glue DPUs to allocate to this Job
+:type int
+:param region_name: aws region name (example: us-east-1)
+:type region_name: str
+:param s3_bucket: S3 bucket where logs and local etl script will be 
uploaded
+:type str
+:param iam_role_name: AWS IAM Role for Glue Job
+:type str
+"""
+
+def __init__(self,
+ job_name=None,
+ desc=None,
+ concurrent_run_limit=None,
+ script_location=None,
+ conns=None,
+ retry_limit=None,
+ num_of_dpus=None,
+ aws_conn_id='aws_default',
+ region_name=None,
+ iam_role_name=None,
+ s3_bucket=None, *args, **kwargs):
+self.job_name = job_name
+self.desc = desc
+self.concurrent_run_limit = concurrent_run_limit or 1
+self.script_location = script_location
+self.conns = conns or ["s3"]
 
 Review comment:
   In cases of lists it is correct to do `var or []`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration

2020-02-28 Thread GitBox
feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS 
Glue Job Integration
URL: https://github.com/apache/airflow/pull/6007#discussion_r385611862
 
 

 ##
 File path: airflow/providers/amazon/aws/hooks/glue.py
 ##
 @@ -0,0 +1,214 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.exceptions import AirflowException
+from airflow.contrib.hooks.aws_hook import AwsHook
 
 Review comment:
   Please import from providers Aws Hook


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration

2020-02-28 Thread GitBox
feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS 
Glue Job Integration
URL: https://github.com/apache/airflow/pull/6007#discussion_r385624472
 
 

 ##
 File path: airflow/providers/amazon/aws/operators/glue.py
 ##
 @@ -0,0 +1,115 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import unicode_literals
+
+from airflow.contrib.hooks.aws_glue_job_hook import AwsGlueJobHook
+from airflow.models import BaseOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class AWSGlueJobOperator(BaseOperator):
+"""
+Creates an AWS Glue Job. AWS Glue is a serverless Spark
+ETL service for running Spark Jobs on the AWS cloud.
+Language support: Python and Scala
+:param job_name: unique job name per AWS Account
 
 Review comment:
   ```suggestion
   Language support: Python and Scala
   
   :param job_name: unique job name per AWS Account
   ```
   
   There needs to be an empty line between documentation header and param 
section.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration

2020-02-28 Thread GitBox
feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS 
Glue Job Integration
URL: https://github.com/apache/airflow/pull/6007#discussion_r385623765
 
 

 ##
 File path: airflow/providers/amazon/aws/hooks/glue.py
 ##
 @@ -0,0 +1,214 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.exceptions import AirflowException
+from airflow.contrib.hooks.aws_hook import AwsHook
+import os.path
+import time
+
+
+class AwsGlueJobHook(AwsHook):
+"""
+Interact with AWS Glue - create job, trigger, crawler
+
+:param job_name: unique job name per AWS account
+:type str
+:param desc: job description
+:type str
+:param concurrent_run_limit: The maximum number of concurrent runs allowed 
for a job
+:type int
+:param script_location: path to etl script either on s3 or local
+:type str
+:param conns: A list of connections used by the job
+:type list
+:param retry_limit: Maximum number of times to retry this job if it fails
+:type int
+:param num_of_dpus: Number of AWS Glue DPUs to allocate to this Job
+:type int
+:param region_name: aws region name (example: us-east-1)
+:type region_name: str
+:param s3_bucket: S3 bucket where logs and local etl script will be 
uploaded
+:type str
+:param iam_role_name: AWS IAM Role for Glue Job
+:type str
+"""
+
+def __init__(self,
+ job_name=None,
+ desc=None,
+ concurrent_run_limit=None,
+ script_location=None,
+ conns=None,
+ retry_limit=None,
+ num_of_dpus=None,
+ aws_conn_id='aws_default',
+ region_name=None,
+ iam_role_name=None,
+ s3_bucket=None, *args, **kwargs):
+self.job_name = job_name
+self.desc = desc
+self.concurrent_run_limit = concurrent_run_limit or 1
+self.script_location = script_location
+self.conns = conns or ["s3"]
+self.retry_limit = retry_limit or 0
+self.num_of_dpus = num_of_dpus or 10
+self.aws_conn_id = aws_conn_id
+self.region_name = region_name
+self.s3_bucket = s3_bucket
+self.role_name = iam_role_name
+self.S3_PROTOCOL = "s3://"
+self.S3_ARTIFACTS_PREFIX = 'artifacts/glue-scripts/'
+self.S3_GLUE_LOGS = 'logs/glue-logs/'
+super(AwsGlueJobHook, self).__init__(*args, **kwargs)
+
+def get_conn(self):
+conn = self.get_client_type('glue', self.region_name)
+return conn
+
+def list_jobs(self):
+conn = self.get_conn()
+return conn.get_jobs()
+
+def get_iam_execution_role(self):
+"""
+:return: iam role for job execution
+"""
+iam_client = self.get_client_type('iam', self.region_name)
+
+try:
+glue_execution_role = iam_client.get_role(RoleName=self.role_name)
+self.log.info("Iam Role Name: {}".format(self.role_name))
+return glue_execution_role
+except Exception as general_error:
+raise AirflowException(
+'Failed to create aws glue job, error: {error}'.format(
+error=str(general_error)
+)
+)
+
+def initialize_job(self, script_arguments=None):
+"""
+Initializes connection with AWS Glue
+to run job
+:return:
+"""
+if self.s3_bucket is None:
+raise AirflowException(
+'Could not initialize glue job, '
+'error: Specify Parameter `s3_bucket`'
+)
+
+glue_client = self.get_conn()
+
+try:
+job_response = self.get_or_create_glue_job()
+job_name = job_response
+job_run = glue_client.start_job_run(
+JobName=job_name,
+Arguments=script_arguments
+)
+return self.job_completion(job_name, job_run['JobRunId'])
+except Exception as general_error:
+raise AirflowException(
+'Failed to run aws glue job, error: {error}'.format(
+ 

[GitHub] [airflow] feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration

2020-02-28 Thread GitBox
feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS 
Glue Job Integration
URL: https://github.com/apache/airflow/pull/6007#discussion_r385615455
 
 

 ##
 File path: airflow/providers/amazon/aws/hooks/glue.py
 ##
 @@ -0,0 +1,214 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.exceptions import AirflowException
+from airflow.contrib.hooks.aws_hook import AwsHook
+import os.path
+import time
+
+
+class AwsGlueJobHook(AwsHook):
+"""
+Interact with AWS Glue - create job, trigger, crawler
+
+:param job_name: unique job name per AWS account
+:type str
+:param desc: job description
+:type str
+:param concurrent_run_limit: The maximum number of concurrent runs allowed 
for a job
+:type int
+:param script_location: path to etl script either on s3 or local
+:type str
+:param conns: A list of connections used by the job
+:type list
+:param retry_limit: Maximum number of times to retry this job if it fails
+:type int
+:param num_of_dpus: Number of AWS Glue DPUs to allocate to this Job
+:type int
+:param region_name: aws region name (example: us-east-1)
+:type region_name: str
+:param s3_bucket: S3 bucket where logs and local etl script will be 
uploaded
+:type str
+:param iam_role_name: AWS IAM Role for Glue Job
+:type str
+"""
+
+def __init__(self,
+ job_name=None,
+ desc=None,
+ concurrent_run_limit=None,
+ script_location=None,
+ conns=None,
+ retry_limit=None,
+ num_of_dpus=None,
+ aws_conn_id='aws_default',
+ region_name=None,
+ iam_role_name=None,
+ s3_bucket=None, *args, **kwargs):
+self.job_name = job_name
+self.desc = desc
+self.concurrent_run_limit = concurrent_run_limit or 1
+self.script_location = script_location
+self.conns = conns or ["s3"]
+self.retry_limit = retry_limit or 0
+self.num_of_dpus = num_of_dpus or 10
+self.aws_conn_id = aws_conn_id
+self.region_name = region_name
+self.s3_bucket = s3_bucket
+self.role_name = iam_role_name
+self.S3_PROTOCOL = "s3://"
+self.S3_ARTIFACTS_PREFIX = 'artifacts/glue-scripts/'
+self.S3_GLUE_LOGS = 'logs/glue-logs/'
+super(AwsGlueJobHook, self).__init__(*args, **kwargs)
+
+def get_conn(self):
+conn = self.get_client_type('glue', self.region_name)
+return conn
 
 Review comment:
   Please note that https://github.com/apache/airflow/pull/7541 will remove the 
need of `get_conn`. So maybe you need to rebase and change it if the other PR 
gets merged first.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration

2020-02-28 Thread GitBox
feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS 
Glue Job Integration
URL: https://github.com/apache/airflow/pull/6007#discussion_r385626287
 
 

 ##
 File path: airflow/providers/amazon/aws/operators/glue.py
 ##
 @@ -0,0 +1,115 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import unicode_literals
+
+from airflow.contrib.hooks.aws_glue_job_hook import AwsGlueJobHook
+from airflow.models import BaseOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class AWSGlueJobOperator(BaseOperator):
+"""
+Creates an AWS Glue Job. AWS Glue is a serverless Spark
+ETL service for running Spark Jobs on the AWS cloud.
+Language support: Python and Scala
+:param job_name: unique job name per AWS Account
+:type str
+:param script_location: location of ETL script. Must be a local or S3 path
+:type str
+:param job_desc: job description details
+:type str
+:param concurrent_run_limit: The maximum number of concurrent runs allowed 
for a job
+:type int
+:param script_args: etl script arguments and AWS Glue arguments
+:type dict
+:param connections: AWS Glue connections to be used by the job.
+:type list
+:param retry_limit: The maximum number of times to retry this job if it 
fails
+:type int
+:param num_of_dpus: Number of AWS Glue DPUs to allocate to this Job.
+:type int
+:param region_name: aws region name (example: us-east-1)
+:type region_name: str
+:param s3_bucket: S3 bucket where logs and local etl script will be 
uploaded
+:type str
+:param iam_role_name: AWS IAM Role for Glue Job Execution
+:type str
+"""
+template_fields = ()
+template_ext = ()
+ui_color = '#ededed'
+
+@apply_defaults
+def __init__(self,
+ job_name='aws_glue_default_job',
+ job_desc='AWS Glue Job with Airflow',
+ script_location=None,
+ concurrent_run_limit=None,
+ script_args={},
+ connections=[],
+ retry_limit=None,
+ num_of_dpus=6,
+ aws_conn_id='aws_default',
+ region_name=None,
+ s3_bucket=None,
+ iam_role_name=None,
+ *args, **kwargs
+ ):
+super(AWSGlueJobOperator, self).__init__(*args, **kwargs)
+self.job_name = job_name
+self.job_desc = job_desc
+self.script_location = script_location
+self.concurrent_run_limit = concurrent_run_limit
+self.script_args = script_args
+self.connections = connections
+self.retry_limit = retry_limit
+self.num_of_dpus = num_of_dpus
+self.aws_conn_id = aws_conn_id,
+self.region_name = region_name
+self.s3_bucket = s3_bucket
+self.iam_role_name = iam_role_name
+
+def execute(self, context):
+"""
+Executes AWS Glue Job from Airflow
+:return:
+"""
+task_instance = context['ti']
+glue_job = AwsGlueJobHook(job_name=self.job_name,
+  desc=self.job_desc,
+  
concurrent_run_limit=self.concurrent_run_limit,
+  script_location=self.script_location,
+  conns=self.connections,
+  retry_limit=self.retry_limit,
+  num_of_dpus=self.num_of_dpus,
+  aws_conn_id=self.aws_conn_id,
+  region_name=self.region_name,
+  s3_bucket=self.s3_bucket,
+  iam_role_name=self.iam_role_name)
+
+self.log.info("Initializing AWS Glue Job: {}".format(self.job_name))
+glue_job_run = glue_job.initialize_job(self.script_args)
+task_instance.xcom_push(key='run_id', value=glue_job_run['JobRunId'])
+self.log.info('AWS Glue Job: {job_name} status: {job_status}. Run Id: 
{run_id}'
+  .format(run_id=glue_job_run['JobRunId'],
+ 

[GitHub] [airflow] feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration

2020-02-28 Thread GitBox
feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS 
Glue Job Integration
URL: https://github.com/apache/airflow/pull/6007#discussion_r385619893
 
 

 ##
 File path: airflow/providers/amazon/aws/hooks/glue.py
 ##
 @@ -0,0 +1,214 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.exceptions import AirflowException
+from airflow.contrib.hooks.aws_hook import AwsHook
+import os.path
+import time
+
+
+class AwsGlueJobHook(AwsHook):
+"""
+Interact with AWS Glue - create job, trigger, crawler
+
+:param job_name: unique job name per AWS account
+:type str
+:param desc: job description
+:type str
+:param concurrent_run_limit: The maximum number of concurrent runs allowed 
for a job
+:type int
+:param script_location: path to etl script either on s3 or local
+:type str
+:param conns: A list of connections used by the job
+:type list
+:param retry_limit: Maximum number of times to retry this job if it fails
+:type int
+:param num_of_dpus: Number of AWS Glue DPUs to allocate to this Job
+:type int
+:param region_name: aws region name (example: us-east-1)
+:type region_name: str
+:param s3_bucket: S3 bucket where logs and local etl script will be 
uploaded
+:type str
+:param iam_role_name: AWS IAM Role for Glue Job
+:type str
+"""
+
+def __init__(self,
+ job_name=None,
+ desc=None,
+ concurrent_run_limit=None,
+ script_location=None,
+ conns=None,
+ retry_limit=None,
+ num_of_dpus=None,
+ aws_conn_id='aws_default',
+ region_name=None,
+ iam_role_name=None,
+ s3_bucket=None, *args, **kwargs):
+self.job_name = job_name
+self.desc = desc
+self.concurrent_run_limit = concurrent_run_limit or 1
+self.script_location = script_location
+self.conns = conns or ["s3"]
+self.retry_limit = retry_limit or 0
+self.num_of_dpus = num_of_dpus or 10
+self.aws_conn_id = aws_conn_id
+self.region_name = region_name
+self.s3_bucket = s3_bucket
+self.role_name = iam_role_name
+self.S3_PROTOCOL = "s3://"
+self.S3_ARTIFACTS_PREFIX = 'artifacts/glue-scripts/'
+self.S3_GLUE_LOGS = 'logs/glue-logs/'
+super(AwsGlueJobHook, self).__init__(*args, **kwargs)
+
+def get_conn(self):
+conn = self.get_client_type('glue', self.region_name)
+return conn
+
+def list_jobs(self):
+conn = self.get_conn()
+return conn.get_jobs()
+
+def get_iam_execution_role(self):
+"""
+:return: iam role for job execution
+"""
+iam_client = self.get_client_type('iam', self.region_name)
+
+try:
+glue_execution_role = iam_client.get_role(RoleName=self.role_name)
+self.log.info("Iam Role Name: {}".format(self.role_name))
+return glue_execution_role
+except Exception as general_error:
+raise AirflowException(
+'Failed to create aws glue job, error: {error}'.format(
+error=str(general_error)
+)
 
 Review comment:
   This looks to me like you only want to add `Failed to create aws glue job`, 
because the rest is the same as when any other exception will be raised.
   
   So you could also do it like this:
   ```suggestion
   except Exception as general_error:
   self.log.error('Failed to create aws glue job')
   raise
   ```
   
   What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration

2020-02-28 Thread GitBox
feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS 
Glue Job Integration
URL: https://github.com/apache/airflow/pull/6007#discussion_r385613906
 
 

 ##
 File path: airflow/providers/amazon/aws/hooks/glue.py
 ##
 @@ -0,0 +1,214 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.exceptions import AirflowException
+from airflow.contrib.hooks.aws_hook import AwsHook
+import os.path
+import time
+
+
+class AwsGlueJobHook(AwsHook):
+"""
+Interact with AWS Glue - create job, trigger, crawler
+
+:param job_name: unique job name per AWS account
+:type str
+:param desc: job description
+:type str
+:param concurrent_run_limit: The maximum number of concurrent runs allowed 
for a job
+:type int
+:param script_location: path to etl script either on s3 or local
+:type str
+:param conns: A list of connections used by the job
+:type list
+:param retry_limit: Maximum number of times to retry this job if it fails
+:type int
+:param num_of_dpus: Number of AWS Glue DPUs to allocate to this Job
+:type int
+:param region_name: aws region name (example: us-east-1)
+:type region_name: str
+:param s3_bucket: S3 bucket where logs and local etl script will be 
uploaded
+:type str
+:param iam_role_name: AWS IAM Role for Glue Job
+:type str
+"""
+
+def __init__(self,
+ job_name=None,
+ desc=None,
+ concurrent_run_limit=None,
+ script_location=None,
+ conns=None,
+ retry_limit=None,
+ num_of_dpus=None,
+ aws_conn_id='aws_default',
+ region_name=None,
+ iam_role_name=None,
+ s3_bucket=None, *args, **kwargs):
+self.job_name = job_name
+self.desc = desc
+self.concurrent_run_limit = concurrent_run_limit or 1
+self.script_location = script_location
+self.conns = conns or ["s3"]
+self.retry_limit = retry_limit or 0
+self.num_of_dpus = num_of_dpus or 10
 
 Review comment:
   Same for those.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration

2020-02-28 Thread GitBox
feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS 
Glue Job Integration
URL: https://github.com/apache/airflow/pull/6007#discussion_r385621263
 
 

 ##
 File path: airflow/providers/amazon/aws/hooks/glue.py
 ##
 @@ -0,0 +1,214 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.exceptions import AirflowException
+from airflow.contrib.hooks.aws_hook import AwsHook
+import os.path
+import time
+
+
+class AwsGlueJobHook(AwsHook):
+"""
+Interact with AWS Glue - create job, trigger, crawler
+
+:param job_name: unique job name per AWS account
+:type str
+:param desc: job description
+:type str
+:param concurrent_run_limit: The maximum number of concurrent runs allowed 
for a job
+:type int
+:param script_location: path to etl script either on s3 or local
+:type str
+:param conns: A list of connections used by the job
+:type list
+:param retry_limit: Maximum number of times to retry this job if it fails
+:type int
+:param num_of_dpus: Number of AWS Glue DPUs to allocate to this Job
+:type int
+:param region_name: aws region name (example: us-east-1)
+:type region_name: str
+:param s3_bucket: S3 bucket where logs and local etl script will be 
uploaded
+:type str
+:param iam_role_name: AWS IAM Role for Glue Job
+:type str
+"""
+
+def __init__(self,
+ job_name=None,
+ desc=None,
+ concurrent_run_limit=None,
+ script_location=None,
+ conns=None,
+ retry_limit=None,
+ num_of_dpus=None,
+ aws_conn_id='aws_default',
+ region_name=None,
+ iam_role_name=None,
+ s3_bucket=None, *args, **kwargs):
+self.job_name = job_name
+self.desc = desc
+self.concurrent_run_limit = concurrent_run_limit or 1
+self.script_location = script_location
+self.conns = conns or ["s3"]
+self.retry_limit = retry_limit or 0
+self.num_of_dpus = num_of_dpus or 10
+self.aws_conn_id = aws_conn_id
+self.region_name = region_name
+self.s3_bucket = s3_bucket
+self.role_name = iam_role_name
+self.S3_PROTOCOL = "s3://"
+self.S3_ARTIFACTS_PREFIX = 'artifacts/glue-scripts/'
+self.S3_GLUE_LOGS = 'logs/glue-logs/'
+super(AwsGlueJobHook, self).__init__(*args, **kwargs)
+
+def get_conn(self):
+conn = self.get_client_type('glue', self.region_name)
+return conn
+
+def list_jobs(self):
+conn = self.get_conn()
+return conn.get_jobs()
+
+def get_iam_execution_role(self):
+"""
+:return: iam role for job execution
+"""
+iam_client = self.get_client_type('iam', self.region_name)
+
+try:
+glue_execution_role = iam_client.get_role(RoleName=self.role_name)
+self.log.info("Iam Role Name: {}".format(self.role_name))
+return glue_execution_role
+except Exception as general_error:
+raise AirflowException(
+'Failed to create aws glue job, error: {error}'.format(
+error=str(general_error)
+)
+)
+
+def initialize_job(self, script_arguments=None):
+"""
+Initializes connection with AWS Glue
+to run job
+:return:
+"""
+if self.s3_bucket is None:
+raise AirflowException(
+'Could not initialize glue job, '
+'error: Specify Parameter `s3_bucket`'
+)
+
+glue_client = self.get_conn()
+
+try:
+job_response = self.get_or_create_glue_job()
+job_name = job_response
+job_run = glue_client.start_job_run(
+JobName=job_name,
+Arguments=script_arguments
+)
+return self.job_completion(job_name, job_run['JobRunId'])
+except Exception as general_error:
+raise AirflowException(
+'Failed to run aws glue job, error: {error}'.format(
+ 

[GitHub] [airflow] feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration

2020-02-28 Thread GitBox
feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS 
Glue Job Integration
URL: https://github.com/apache/airflow/pull/6007#discussion_r385621421
 
 

 ##
 File path: airflow/providers/amazon/aws/hooks/glue.py
 ##
 @@ -0,0 +1,214 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.exceptions import AirflowException
+from airflow.contrib.hooks.aws_hook import AwsHook
+import os.path
+import time
+
+
+class AwsGlueJobHook(AwsHook):
+"""
+Interact with AWS Glue - create job, trigger, crawler
+
+:param job_name: unique job name per AWS account
+:type str
+:param desc: job description
+:type str
+:param concurrent_run_limit: The maximum number of concurrent runs allowed 
for a job
+:type int
+:param script_location: path to etl script either on s3 or local
+:type str
+:param conns: A list of connections used by the job
+:type list
+:param retry_limit: Maximum number of times to retry this job if it fails
+:type int
+:param num_of_dpus: Number of AWS Glue DPUs to allocate to this Job
+:type int
+:param region_name: aws region name (example: us-east-1)
+:type region_name: str
+:param s3_bucket: S3 bucket where logs and local etl script will be 
uploaded
+:type str
+:param iam_role_name: AWS IAM Role for Glue Job
+:type str
+"""
+
+def __init__(self,
+ job_name=None,
+ desc=None,
+ concurrent_run_limit=None,
+ script_location=None,
+ conns=None,
+ retry_limit=None,
+ num_of_dpus=None,
+ aws_conn_id='aws_default',
+ region_name=None,
+ iam_role_name=None,
+ s3_bucket=None, *args, **kwargs):
+self.job_name = job_name
+self.desc = desc
+self.concurrent_run_limit = concurrent_run_limit or 1
+self.script_location = script_location
+self.conns = conns or ["s3"]
+self.retry_limit = retry_limit or 0
+self.num_of_dpus = num_of_dpus or 10
+self.aws_conn_id = aws_conn_id
+self.region_name = region_name
+self.s3_bucket = s3_bucket
+self.role_name = iam_role_name
+self.S3_PROTOCOL = "s3://"
+self.S3_ARTIFACTS_PREFIX = 'artifacts/glue-scripts/'
+self.S3_GLUE_LOGS = 'logs/glue-logs/'
+super(AwsGlueJobHook, self).__init__(*args, **kwargs)
+
+def get_conn(self):
+conn = self.get_client_type('glue', self.region_name)
+return conn
+
+def list_jobs(self):
+conn = self.get_conn()
+return conn.get_jobs()
+
+def get_iam_execution_role(self):
+"""
+:return: iam role for job execution
+"""
+iam_client = self.get_client_type('iam', self.region_name)
+
+try:
+glue_execution_role = iam_client.get_role(RoleName=self.role_name)
+self.log.info("Iam Role Name: {}".format(self.role_name))
+return glue_execution_role
+except Exception as general_error:
+raise AirflowException(
+'Failed to create aws glue job, error: {error}'.format(
+error=str(general_error)
+)
+)
+
+def initialize_job(self, script_arguments=None):
+"""
+Initializes connection with AWS Glue
+to run job
+:return:
+"""
+if self.s3_bucket is None:
+raise AirflowException(
+'Could not initialize glue job, '
+'error: Specify Parameter `s3_bucket`'
+)
+
+glue_client = self.get_conn()
+
+try:
+job_response = self.get_or_create_glue_job()
+job_name = job_response
+job_run = glue_client.start_job_run(
+JobName=job_name,
+Arguments=script_arguments
+)
+return self.job_completion(job_name, job_run['JobRunId'])
+except Exception as general_error:
+raise AirflowException(
+'Failed to run aws glue job, error: {error}'.format(
+ 

[GitHub] [airflow] feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration

2020-02-28 Thread GitBox
feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS 
Glue Job Integration
URL: https://github.com/apache/airflow/pull/6007#discussion_r385622350
 
 

 ##
 File path: airflow/providers/amazon/aws/hooks/glue.py
 ##
 @@ -0,0 +1,214 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.exceptions import AirflowException
+from airflow.contrib.hooks.aws_hook import AwsHook
+import os.path
+import time
+
+
+class AwsGlueJobHook(AwsHook):
+"""
+Interact with AWS Glue - create job, trigger, crawler
+
+:param job_name: unique job name per AWS account
+:type str
+:param desc: job description
+:type str
+:param concurrent_run_limit: The maximum number of concurrent runs allowed 
for a job
+:type int
+:param script_location: path to etl script either on s3 or local
+:type str
+:param conns: A list of connections used by the job
+:type list
+:param retry_limit: Maximum number of times to retry this job if it fails
+:type int
+:param num_of_dpus: Number of AWS Glue DPUs to allocate to this Job
+:type int
+:param region_name: aws region name (example: us-east-1)
+:type region_name: str
+:param s3_bucket: S3 bucket where logs and local etl script will be 
uploaded
+:type str
+:param iam_role_name: AWS IAM Role for Glue Job
+:type str
+"""
+
+def __init__(self,
+ job_name=None,
+ desc=None,
+ concurrent_run_limit=None,
+ script_location=None,
+ conns=None,
+ retry_limit=None,
+ num_of_dpus=None,
+ aws_conn_id='aws_default',
+ region_name=None,
+ iam_role_name=None,
+ s3_bucket=None, *args, **kwargs):
+self.job_name = job_name
+self.desc = desc
+self.concurrent_run_limit = concurrent_run_limit or 1
+self.script_location = script_location
+self.conns = conns or ["s3"]
+self.retry_limit = retry_limit or 0
+self.num_of_dpus = num_of_dpus or 10
+self.aws_conn_id = aws_conn_id
+self.region_name = region_name
+self.s3_bucket = s3_bucket
+self.role_name = iam_role_name
+self.S3_PROTOCOL = "s3://"
+self.S3_ARTIFACTS_PREFIX = 'artifacts/glue-scripts/'
+self.S3_GLUE_LOGS = 'logs/glue-logs/'
+super(AwsGlueJobHook, self).__init__(*args, **kwargs)
+
+def get_conn(self):
+conn = self.get_client_type('glue', self.region_name)
+return conn
+
+def list_jobs(self):
+conn = self.get_conn()
+return conn.get_jobs()
+
+def get_iam_execution_role(self):
+"""
+:return: iam role for job execution
+"""
+iam_client = self.get_client_type('iam', self.region_name)
+
+try:
+glue_execution_role = iam_client.get_role(RoleName=self.role_name)
+self.log.info("Iam Role Name: {}".format(self.role_name))
+return glue_execution_role
+except Exception as general_error:
+raise AirflowException(
+'Failed to create aws glue job, error: {error}'.format(
+error=str(general_error)
+)
+)
+
+def initialize_job(self, script_arguments=None):
+"""
+Initializes connection with AWS Glue
+to run job
+:return:
+"""
+if self.s3_bucket is None:
+raise AirflowException(
+'Could not initialize glue job, '
+'error: Specify Parameter `s3_bucket`'
+)
+
+glue_client = self.get_conn()
+
+try:
+job_response = self.get_or_create_glue_job()
+job_name = job_response
+job_run = glue_client.start_job_run(
+JobName=job_name,
+Arguments=script_arguments
+)
+return self.job_completion(job_name, job_run['JobRunId'])
+except Exception as general_error:
+raise AirflowException(
+'Failed to run aws glue job, error: {error}'.format(
+ 

[GitHub] [airflow] feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration

2020-02-28 Thread GitBox
feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS 
Glue Job Integration
URL: https://github.com/apache/airflow/pull/6007#discussion_r385613153
 
 

 ##
 File path: airflow/providers/amazon/aws/hooks/glue.py
 ##
 @@ -0,0 +1,214 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.exceptions import AirflowException
+from airflow.contrib.hooks.aws_hook import AwsHook
+import os.path
+import time
+
+
+class AwsGlueJobHook(AwsHook):
+"""
+Interact with AWS Glue - create job, trigger, crawler
+
+:param job_name: unique job name per AWS account
+:type str
+:param desc: job description
+:type str
+:param concurrent_run_limit: The maximum number of concurrent runs allowed 
for a job
+:type int
+:param script_location: path to etl script either on s3 or local
+:type str
+:param conns: A list of connections used by the job
+:type list
+:param retry_limit: Maximum number of times to retry this job if it fails
+:type int
+:param num_of_dpus: Number of AWS Glue DPUs to allocate to this Job
+:type int
+:param region_name: aws region name (example: us-east-1)
+:type region_name: str
+:param s3_bucket: S3 bucket where logs and local etl script will be 
uploaded
+:type str
+:param iam_role_name: AWS IAM Role for Glue Job
+:type str
 
 Review comment:
   These are listed in the wrong order. First comes `iam_role_name` then 
`s3_bucket`. Or change the order in the `__init__` :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration

2020-02-27 Thread GitBox
feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS 
Glue Job Integration
URL: https://github.com/apache/airflow/pull/6007#discussion_r385213897
 
 

 ##
 File path: docs/integration.rst
 ##
 @@ -17,7 +17,6 @@
 
 Integration
 ===
-
 
 Review comment:
   And please check what you commit before you push :) `git status`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration

2020-02-27 Thread GitBox
feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS 
Glue Job Integration
URL: https://github.com/apache/airflow/pull/6007#discussion_r385209674
 
 

 ##
 File path: docs/integration.rst
 ##
 @@ -17,7 +17,6 @@
 
 Integration
 ===
-
 
 Review comment:
   Reset your commit and remove this file from being committed. 
https://stackoverflow.com/a/15321456


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration

2020-02-27 Thread GitBox
feluelle commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS 
Glue Job Integration
URL: https://github.com/apache/airflow/pull/6007#discussion_r385107981
 
 

 ##
 File path: docs/integration.rst
 ##
 @@ -17,7 +17,6 @@
 
 Integration
 ===
-
 
 Review comment:
   Please do not include this change.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services