[GitHub] [airflow] abdulbasitds commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration

2020-02-27 Thread GitBox
abdulbasitds commented on a change in pull request #6007: [AIRFLOW-2310] Enable 
AWS Glue Job Integration
URL: https://github.com/apache/airflow/pull/6007#discussion_r385110737
 
 

 ##
 File path: docs/integration.rst
 ##
 @@ -17,7 +17,6 @@
 
 Integration
 ===
-
 
 Review comment:
   Thanks, How can i exclude it? there is option 'delete file' but will it 
delete file from the repo?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] abdulbasitds commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration

2020-02-27 Thread GitBox
abdulbasitds commented on a change in pull request #6007: [AIRFLOW-2310] Enable 
AWS Glue Job Integration
URL: https://github.com/apache/airflow/pull/6007#discussion_r385377395
 
 

 ##
 File path: docs/integration.rst
 ##
 @@ -17,7 +17,6 @@
 
 Integration
 ===
-
 
 Review comment:
   Hi, I am sorry but reseting the commit and removing file doesnt make sense 
to me somehow (As reverting to some commiy will still have the things that you 
wanted me to remove) and eventually make things complicated. Can i checkout 
this file from apache/master and comit that?
   
   ```
   git checkout apache/master -- docs/integration.rst
   git commit -m "removing a file from PR"
   git push origin master
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] abdulbasitds commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration

2020-02-27 Thread GitBox
abdulbasitds commented on a change in pull request #6007: [AIRFLOW-2310] Enable 
AWS Glue Job Integration
URL: https://github.com/apache/airflow/pull/6007#discussion_r385377395
 
 

 ##
 File path: docs/integration.rst
 ##
 @@ -17,7 +17,6 @@
 
 Integration
 ===
-
 
 Review comment:
   Hi, I am sorry but reseting the commit and removing file doesnt make sense 
to me somehow (As reverting to some commit will still have the things that you 
wanted me to remove) and eventually make things complicated. Can i checkout 
this file from apache/master and comit that?
   
   ```
   git checkout apache/master -- docs/integration.rst
   git commit -m "removing a file from PR"
   git push origin master
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] abdulbasitds commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration

2020-02-27 Thread GitBox
abdulbasitds commented on a change in pull request #6007: [AIRFLOW-2310] Enable 
AWS Glue Job Integration
URL: https://github.com/apache/airflow/pull/6007#discussion_r385377395
 
 

 ##
 File path: docs/integration.rst
 ##
 @@ -17,7 +17,6 @@
 
 Integration
 ===
-
 
 Review comment:
   Hi, I am sorry but resetting the commit and removing file doesnt make sense 
to me somehow (As reverting to some commit will still have the things that you 
wanted me to remove) and eventually make things complicated. I instead checked 
out this file from apache/master and comiit that like following, It doesnt 
appears in changed files as well, is it okay?
   
   ```
   git checkout apache/master -- docs/integration.rst
   git commit -m "removing a file from PR"
   git push origin master
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] abdulbasitds commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration

2020-02-27 Thread GitBox
abdulbasitds commented on a change in pull request #6007: [AIRFLOW-2310] Enable 
AWS Glue Job Integration
URL: https://github.com/apache/airflow/pull/6007#discussion_r385377395
 
 

 ##
 File path: docs/integration.rst
 ##
 @@ -17,7 +17,6 @@
 
 Integration
 ===
-
 
 Review comment:
   Hi, I am sorry but resetting the commit and removing file doesnt make sense 
to me somehow (As reverting to some commit will still have the things that you 
wanted me to remove) and eventually make things complicated. I instead checked 
out this file from apache/master and comiit that like following, It doesnt 
appears in changed files as well, can you please tell if is it okay?
   
   ```
   git checkout apache/master -- docs/integration.rst
   git commit -m "removing a file from PR"
   git push origin master
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] abdulbasitds commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration

2020-03-24 Thread GitBox
abdulbasitds commented on a change in pull request #6007: [AIRFLOW-2310] Enable 
AWS Glue Job Integration
URL: https://github.com/apache/airflow/pull/6007#discussion_r397106655
 
 

 ##
 File path: airflow/providers/amazon/aws/hooks/glue.py
 ##
 @@ -0,0 +1,214 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.exceptions import AirflowException
+from airflow.contrib.hooks.aws_hook import AwsHook
 
 Review comment:
   there is a base_hook in providers, I can use that


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] abdulbasitds commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration

2020-03-24 Thread GitBox
abdulbasitds commented on a change in pull request #6007: [AIRFLOW-2310] Enable 
AWS Glue Job Integration
URL: https://github.com/apache/airflow/pull/6007#discussion_r397106655
 
 

 ##
 File path: airflow/providers/amazon/aws/hooks/glue.py
 ##
 @@ -0,0 +1,214 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.exceptions import AirflowException
+from airflow.contrib.hooks.aws_hook import AwsHook
 
 Review comment:
   there is a base_hook in providers, I can use that


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] abdulbasitds commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration

2020-03-24 Thread GitBox
abdulbasitds commented on a change in pull request #6007: [AIRFLOW-2310] Enable 
AWS Glue Job Integration
URL: https://github.com/apache/airflow/pull/6007#discussion_r397129801
 
 

 ##
 File path: airflow/providers/amazon/aws/hooks/glue.py
 ##
 @@ -0,0 +1,214 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.exceptions import AirflowException
+from airflow.contrib.hooks.aws_hook import AwsHook
+import os.path
+import time
+
+
+class AwsGlueJobHook(AwsHook):
+"""
+Interact with AWS Glue - create job, trigger, crawler
+
+:param job_name: unique job name per AWS account
+:type str
+:param desc: job description
+:type str
+:param concurrent_run_limit: The maximum number of concurrent runs allowed 
for a job
+:type int
+:param script_location: path to etl script either on s3 or local
+:type str
+:param conns: A list of connections used by the job
+:type list
+:param retry_limit: Maximum number of times to retry this job if it fails
+:type int
+:param num_of_dpus: Number of AWS Glue DPUs to allocate to this Job
+:type int
+:param region_name: aws region name (example: us-east-1)
+:type region_name: str
+:param s3_bucket: S3 bucket where logs and local etl script will be 
uploaded
+:type str
+:param iam_role_name: AWS IAM Role for Glue Job
+:type str
+"""
+
+def __init__(self,
+ job_name=None,
+ desc=None,
+ concurrent_run_limit=None,
+ script_location=None,
+ conns=None,
+ retry_limit=None,
+ num_of_dpus=None,
+ aws_conn_id='aws_default',
+ region_name=None,
+ iam_role_name=None,
+ s3_bucket=None, *args, **kwargs):
+self.job_name = job_name
+self.desc = desc
+self.concurrent_run_limit = concurrent_run_limit or 1
+self.script_location = script_location
+self.conns = conns or ["s3"]
+self.retry_limit = retry_limit or 0
+self.num_of_dpus = num_of_dpus or 10
+self.aws_conn_id = aws_conn_id
+self.region_name = region_name
+self.s3_bucket = s3_bucket
+self.role_name = iam_role_name
+self.S3_PROTOCOL = "s3://"
+self.S3_ARTIFACTS_PREFIX = 'artifacts/glue-scripts/'
+self.S3_GLUE_LOGS = 'logs/glue-logs/'
+super(AwsGlueJobHook, self).__init__(*args, **kwargs)
+
+def get_conn(self):
+conn = self.get_client_type('glue', self.region_name)
+return conn
+
+def list_jobs(self):
+conn = self.get_conn()
+return conn.get_jobs()
+
+def get_iam_execution_role(self):
+"""
+:return: iam role for job execution
+"""
+iam_client = self.get_client_type('iam', self.region_name)
+
+try:
+glue_execution_role = iam_client.get_role(RoleName=self.role_name)
+self.log.info("Iam Role Name: {}".format(self.role_name))
+return glue_execution_role
+except Exception as general_error:
+raise AirflowException(
+'Failed to create aws glue job, error: {error}'.format(
+error=str(general_error)
+)
+)
+
+def initialize_job(self, script_arguments=None):
+"""
+Initializes connection with AWS Glue
+to run job
+:return:
+"""
+if self.s3_bucket is None:
+raise AirflowException(
+'Could not initialize glue job, '
+'error: Specify Parameter `s3_bucket`'
+)
+
+glue_client = self.get_conn()
+
+try:
+job_response = self.get_or_create_glue_job()
+job_name = job_response
+job_run = glue_client.start_job_run(
+JobName=job_name,
+Arguments=script_arguments
+)
+return self.job_completion(job_name, job_run['JobRunId'])
+except Exception as general_error:
+raise AirflowException(
+'Failed to run aws glue job, error: {error}'.format(
+ 

[GitHub] [airflow] abdulbasitds commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration

2020-03-24 Thread GitBox
abdulbasitds commented on a change in pull request #6007: [AIRFLOW-2310] Enable 
AWS Glue Job Integration
URL: https://github.com/apache/airflow/pull/6007#discussion_r397129801
 
 

 ##
 File path: airflow/providers/amazon/aws/hooks/glue.py
 ##
 @@ -0,0 +1,214 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.exceptions import AirflowException
+from airflow.contrib.hooks.aws_hook import AwsHook
+import os.path
+import time
+
+
+class AwsGlueJobHook(AwsHook):
+"""
+Interact with AWS Glue - create job, trigger, crawler
+
+:param job_name: unique job name per AWS account
+:type str
+:param desc: job description
+:type str
+:param concurrent_run_limit: The maximum number of concurrent runs allowed 
for a job
+:type int
+:param script_location: path to etl script either on s3 or local
+:type str
+:param conns: A list of connections used by the job
+:type list
+:param retry_limit: Maximum number of times to retry this job if it fails
+:type int
+:param num_of_dpus: Number of AWS Glue DPUs to allocate to this Job
+:type int
+:param region_name: aws region name (example: us-east-1)
+:type region_name: str
+:param s3_bucket: S3 bucket where logs and local etl script will be 
uploaded
+:type str
+:param iam_role_name: AWS IAM Role for Glue Job
+:type str
+"""
+
+def __init__(self,
+ job_name=None,
+ desc=None,
+ concurrent_run_limit=None,
+ script_location=None,
+ conns=None,
+ retry_limit=None,
+ num_of_dpus=None,
+ aws_conn_id='aws_default',
+ region_name=None,
+ iam_role_name=None,
+ s3_bucket=None, *args, **kwargs):
+self.job_name = job_name
+self.desc = desc
+self.concurrent_run_limit = concurrent_run_limit or 1
+self.script_location = script_location
+self.conns = conns or ["s3"]
+self.retry_limit = retry_limit or 0
+self.num_of_dpus = num_of_dpus or 10
+self.aws_conn_id = aws_conn_id
+self.region_name = region_name
+self.s3_bucket = s3_bucket
+self.role_name = iam_role_name
+self.S3_PROTOCOL = "s3://"
+self.S3_ARTIFACTS_PREFIX = 'artifacts/glue-scripts/'
+self.S3_GLUE_LOGS = 'logs/glue-logs/'
+super(AwsGlueJobHook, self).__init__(*args, **kwargs)
+
+def get_conn(self):
+conn = self.get_client_type('glue', self.region_name)
+return conn
+
+def list_jobs(self):
+conn = self.get_conn()
+return conn.get_jobs()
+
+def get_iam_execution_role(self):
+"""
+:return: iam role for job execution
+"""
+iam_client = self.get_client_type('iam', self.region_name)
+
+try:
+glue_execution_role = iam_client.get_role(RoleName=self.role_name)
+self.log.info("Iam Role Name: {}".format(self.role_name))
+return glue_execution_role
+except Exception as general_error:
+raise AirflowException(
+'Failed to create aws glue job, error: {error}'.format(
+error=str(general_error)
+)
+)
+
+def initialize_job(self, script_arguments=None):
+"""
+Initializes connection with AWS Glue
+to run job
+:return:
+"""
+if self.s3_bucket is None:
+raise AirflowException(
+'Could not initialize glue job, '
+'error: Specify Parameter `s3_bucket`'
+)
+
+glue_client = self.get_conn()
+
+try:
+job_response = self.get_or_create_glue_job()
+job_name = job_response
+job_run = glue_client.start_job_run(
+JobName=job_name,
+Arguments=script_arguments
+)
+return self.job_completion(job_name, job_run['JobRunId'])
+except Exception as general_error:
+raise AirflowException(
+'Failed to run aws glue job, error: {error}'.format(
+ 

[GitHub] [airflow] abdulbasitds commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration

2020-03-24 Thread GitBox
abdulbasitds commented on a change in pull request #6007: [AIRFLOW-2310] Enable 
AWS Glue Job Integration
URL: https://github.com/apache/airflow/pull/6007#discussion_r397142449
 
 

 ##
 File path: airflow/providers/amazon/aws/hooks/glue.py
 ##
 @@ -0,0 +1,214 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.exceptions import AirflowException
+from airflow.contrib.hooks.aws_hook import AwsHook
+import os.path
+import time
+
+
+class AwsGlueJobHook(AwsHook):
+"""
+Interact with AWS Glue - create job, trigger, crawler
+
+:param job_name: unique job name per AWS account
+:type str
+:param desc: job description
+:type str
+:param concurrent_run_limit: The maximum number of concurrent runs allowed 
for a job
+:type int
+:param script_location: path to etl script either on s3 or local
+:type str
+:param conns: A list of connections used by the job
+:type list
+:param retry_limit: Maximum number of times to retry this job if it fails
+:type int
+:param num_of_dpus: Number of AWS Glue DPUs to allocate to this Job
+:type int
+:param region_name: aws region name (example: us-east-1)
+:type region_name: str
+:param s3_bucket: S3 bucket where logs and local etl script will be 
uploaded
+:type str
+:param iam_role_name: AWS IAM Role for Glue Job
+:type str
+"""
+
+def __init__(self,
+ job_name=None,
+ desc=None,
+ concurrent_run_limit=None,
+ script_location=None,
+ conns=None,
+ retry_limit=None,
+ num_of_dpus=None,
+ aws_conn_id='aws_default',
+ region_name=None,
+ iam_role_name=None,
+ s3_bucket=None, *args, **kwargs):
+self.job_name = job_name
+self.desc = desc
+self.concurrent_run_limit = concurrent_run_limit or 1
+self.script_location = script_location
+self.conns = conns or ["s3"]
+self.retry_limit = retry_limit or 0
+self.num_of_dpus = num_of_dpus or 10
+self.aws_conn_id = aws_conn_id
+self.region_name = region_name
+self.s3_bucket = s3_bucket
+self.role_name = iam_role_name
+self.S3_PROTOCOL = "s3://"
+self.S3_ARTIFACTS_PREFIX = 'artifacts/glue-scripts/'
+self.S3_GLUE_LOGS = 'logs/glue-logs/'
+super(AwsGlueJobHook, self).__init__(*args, **kwargs)
+
+def get_conn(self):
+conn = self.get_client_type('glue', self.region_name)
+return conn
 
 Review comment:
   I am sorry but I didnt get what should I do for now for this, If its really 
very neccessary, can you please explain how should I do it, as these are 
changes from other Pull


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] abdulbasitds commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration

2020-04-02 Thread GitBox
abdulbasitds commented on a change in pull request #6007: [AIRFLOW-2310] Enable 
AWS Glue Job Integration
URL: https://github.com/apache/airflow/pull/6007#discussion_r402236060
 
 

 ##
 File path: airflow/providers/amazon/aws/sensors/glue.py
 ##
 @@ -0,0 +1,57 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+from airflow.providers.amazon.aws.hooks.glue import AwsGlueJobHook
+
+
+class AwsGlueJobSensor(BaseSensorOperator):
+"""
+Waits for an AWS Glue Job to reach any of the status below
+'FAILED', 'STOPPED', 'SUCCEEDED'
+
+:param job_name: The AWS Glue Job unique name
+:type str
+:param run_id: The AWS Glue current running job identifier
+:type str
+"""
+template_fields = ('job_name', 'run_id')
+
+@apply_defaults
+def __init__(self,
+ job_name,
+ run_id,
+ aws_conn_id='aws_default',
+ *args,
+ **kwargs):
+super(AwsGlueJobSensor, self).__init__(*args, **kwargs)
+self.job_name = job_name
+self.run_id = run_id
+self.aws_conn_id = aws_conn_id
+self.targeted_status = ['FAILED', 'STOPPED', 'SUCCEEDED']
+
+def poke(self, context):
+self.log.info("Poking for job run status : {self.targeted_status}\n"
+  "for Glue Job {self.job_name} and ID {self.run_id}"
+  .format(**locals()))
+hook = AwsGlueJobHook(aws_conn_id=self.aws_conn_id)
+job_state = hook.job_completion(job_name=self.job_name,
+run_id=self.run_id)
+return job_state['JobRunState'].upper() in self.targeted_status
 
 Review comment:
   Yes, Actually I wasn't using it before but there were inconsistency in 
either of the status which was coming back as lower, I don#t remember which one 
was it.
   
   Should i mark it resolved?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] abdulbasitds commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration

2020-04-02 Thread GitBox
abdulbasitds commented on a change in pull request #6007: [AIRFLOW-2310] Enable 
AWS Glue Job Integration
URL: https://github.com/apache/airflow/pull/6007#discussion_r402237218
 
 

 ##
 File path: airflow/providers/amazon/aws/hooks/glue.py
 ##
 @@ -0,0 +1,214 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+import os.path
+import time
+
+
+class AwsGlueJobHook(AwsBaseHook):
+"""
+Interact with AWS Glue - create job, trigger, crawler
+
+:param job_name: unique job name per AWS account
+:type job_name: str
+:param desc: job description
+:type desc: str
+:param concurrent_run_limit: The maximum number of concurrent runs allowed 
for a job
+:type concurrent_run_limit: int
+:param script_location: path to etl script either on s3 or local
+:type script_location: str
+:param conns: A list of connections used by the job
+:type conns: list
+:param retry_limit: Maximum number of times to retry this job if it fails
+:type retry_limit: int
+:param num_of_dpus: Number of AWS Glue DPUs to allocate to this Job
+:type num_of_dpus: int
+:param region_name: aws region name (example: us-east-1)
+:type region_name: str
+:param iam_role_name: AWS IAM Role for Glue Job
+:type iam_role_name: str
+:param s3_bucket: S3 bucket where logs and local etl script will be 
uploaded
+:type s3_bucket: str
+"""
+
+def __init__(self,
+ job_name=None,
+ desc=None,
+ concurrent_run_limit=1,
+ script_location=None,
+ conns=None,
+ retry_limit=0,
+ num_of_dpus=10,
+ aws_conn_id='aws_default',
+ region_name=None,
+ iam_role_name=None,
+ s3_bucket=None, *args, **kwargs):
+self.job_name = job_name
+self.desc = desc
+self.concurrent_run_limit = concurrent_run_limit
+self.script_location = script_location
+self.conns = conns or ["s3"]
+self.retry_limit = retry_limit
+self.num_of_dpus = num_of_dpus
+self.aws_conn_id = aws_conn_id
+self.region_name = region_name
+self.s3_bucket = s3_bucket
+self.role_name = iam_role_name
+self.S3_PROTOCOL = "s3://"
+self.S3_ARTIFACTS_PREFIX = 'artifacts/glue-scripts/'
+self.S3_GLUE_LOGS = 'logs/glue-logs/'
+super(AwsGlueJobHook, self).__init__(*args, **kwargs)
+
+def get_conn(self):
+conn = self.get_client_type('glue', self.region_name)
+return conn
+
+def list_jobs(self):
+conn = self.get_conn()
+return conn.get_jobs()
+
+def get_iam_execution_role(self):
+"""
+:return: iam role for job execution
+"""
+iam_client = self.get_client_type('iam', self.region_name)
+
+try:
+glue_execution_role = iam_client.get_role(RoleName=self.role_name)
+self.log.info("Iam Role Name: %s", self.role_name)
+return glue_execution_role
+except Exception as general_error:
+self.log.error(f'Failed to create aws glue job, error: 
{str(general_error)}')
+raise
+
+def initialize_job(self, script_arguments=None):
+"""
+Initializes connection with AWS Glue
+to run job
+:return:
+"""
+if self.s3_bucket is None:
+raise AirflowException(
+'Could not initialize glue job, '
+'error: Specify Parameter `s3_bucket`'
+)
+
+glue_client = self.get_conn()
+
+try:
+job_response = self.get_or_create_glue_job()
+job_name = job_response
+job_run = glue_client.start_job_run(
+JobName=job_name,
+Arguments=script_arguments
+)
+return self.job_completion(job_name, job_run['JobRunId'])
+except Exception as general_error:
+self.log.error(f'Failed to run aws glue job, error: 
{str(general_error)}')
+  

[GitHub] [airflow] abdulbasitds commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration

2020-04-02 Thread GitBox
abdulbasitds commented on a change in pull request #6007: [AIRFLOW-2310] Enable 
AWS Glue Job Integration
URL: https://github.com/apache/airflow/pull/6007#discussion_r402237893
 
 

 ##
 File path: airflow/providers/amazon/aws/hooks/glue.py
 ##
 @@ -0,0 +1,214 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+import os.path
+import time
+
+
+class AwsGlueJobHook(AwsBaseHook):
+"""
+Interact with AWS Glue - create job, trigger, crawler
+
+:param job_name: unique job name per AWS account
+:type job_name: str
+:param desc: job description
+:type desc: str
+:param concurrent_run_limit: The maximum number of concurrent runs allowed 
for a job
+:type concurrent_run_limit: int
+:param script_location: path to etl script either on s3 or local
+:type script_location: str
+:param conns: A list of connections used by the job
+:type conns: list
+:param retry_limit: Maximum number of times to retry this job if it fails
+:type retry_limit: int
+:param num_of_dpus: Number of AWS Glue DPUs to allocate to this Job
+:type num_of_dpus: int
+:param region_name: aws region name (example: us-east-1)
+:type region_name: str
+:param iam_role_name: AWS IAM Role for Glue Job
+:type iam_role_name: str
+:param s3_bucket: S3 bucket where logs and local etl script will be 
uploaded
+:type s3_bucket: str
+"""
+
+def __init__(self,
+ job_name=None,
+ desc=None,
+ concurrent_run_limit=1,
+ script_location=None,
+ conns=None,
+ retry_limit=0,
+ num_of_dpus=10,
+ aws_conn_id='aws_default',
+ region_name=None,
+ iam_role_name=None,
+ s3_bucket=None, *args, **kwargs):
+self.job_name = job_name
+self.desc = desc
+self.concurrent_run_limit = concurrent_run_limit
+self.script_location = script_location
+self.conns = conns or ["s3"]
+self.retry_limit = retry_limit
+self.num_of_dpus = num_of_dpus
+self.aws_conn_id = aws_conn_id
+self.region_name = region_name
+self.s3_bucket = s3_bucket
+self.role_name = iam_role_name
+self.S3_PROTOCOL = "s3://"
+self.S3_ARTIFACTS_PREFIX = 'artifacts/glue-scripts/'
+self.S3_GLUE_LOGS = 'logs/glue-logs/'
+super(AwsGlueJobHook, self).__init__(*args, **kwargs)
+
+def get_conn(self):
+conn = self.get_client_type('glue', self.region_name)
+return conn
+
+def list_jobs(self):
+conn = self.get_conn()
+return conn.get_jobs()
+
+def get_iam_execution_role(self):
+"""
+:return: iam role for job execution
+"""
+iam_client = self.get_client_type('iam', self.region_name)
+
+try:
+glue_execution_role = iam_client.get_role(RoleName=self.role_name)
+self.log.info("Iam Role Name: %s", self.role_name)
+return glue_execution_role
+except Exception as general_error:
+self.log.error(f'Failed to create aws glue job, error: 
{str(general_error)}')
+raise
 
 Review comment:
   Hey, Thanks for pointing out, It is using get_client_type and get_role from 
awsbasehook actually, so its already taking benefit from that implementation


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] abdulbasitds commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration

2020-04-02 Thread GitBox
abdulbasitds commented on a change in pull request #6007: [AIRFLOW-2310] Enable 
AWS Glue Job Integration
URL: https://github.com/apache/airflow/pull/6007#discussion_r402332457
 
 

 ##
 File path: airflow/providers/amazon/aws/hooks/glue.py
 ##
 @@ -0,0 +1,214 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+import os.path
+import time
+
+
+class AwsGlueJobHook(AwsBaseHook):
+"""
+Interact with AWS Glue - create job, trigger, crawler
+
+:param job_name: unique job name per AWS account
+:type job_name: str
+:param desc: job description
+:type desc: str
+:param concurrent_run_limit: The maximum number of concurrent runs allowed 
for a job
+:type concurrent_run_limit: int
+:param script_location: path to etl script either on s3 or local
+:type script_location: str
+:param conns: A list of connections used by the job
+:type conns: list
+:param retry_limit: Maximum number of times to retry this job if it fails
+:type retry_limit: int
+:param num_of_dpus: Number of AWS Glue DPUs to allocate to this Job
+:type num_of_dpus: int
+:param region_name: aws region name (example: us-east-1)
+:type region_name: str
+:param iam_role_name: AWS IAM Role for Glue Job
+:type iam_role_name: str
+:param s3_bucket: S3 bucket where logs and local etl script will be 
uploaded
+:type s3_bucket: str
+"""
+
+def __init__(self,
+ job_name=None,
+ desc=None,
+ concurrent_run_limit=1,
+ script_location=None,
+ conns=None,
+ retry_limit=0,
+ num_of_dpus=10,
+ aws_conn_id='aws_default',
+ region_name=None,
+ iam_role_name=None,
+ s3_bucket=None, *args, **kwargs):
+self.job_name = job_name
+self.desc = desc
+self.concurrent_run_limit = concurrent_run_limit
+self.script_location = script_location
+self.conns = conns or ["s3"]
+self.retry_limit = retry_limit
+self.num_of_dpus = num_of_dpus
+self.aws_conn_id = aws_conn_id
+self.region_name = region_name
+self.s3_bucket = s3_bucket
+self.role_name = iam_role_name
+self.S3_PROTOCOL = "s3://"
+self.S3_ARTIFACTS_PREFIX = 'artifacts/glue-scripts/'
+self.S3_GLUE_LOGS = 'logs/glue-logs/'
+super(AwsGlueJobHook, self).__init__(*args, **kwargs)
+
+def get_conn(self):
+conn = self.get_client_type('glue', self.region_name)
+return conn
+
+def list_jobs(self):
+conn = self.get_conn()
+return conn.get_jobs()
+
+def get_iam_execution_role(self):
+"""
+:return: iam role for job execution
+"""
+iam_client = self.get_client_type('iam', self.region_name)
+
+try:
+glue_execution_role = iam_client.get_role(RoleName=self.role_name)
+self.log.info("Iam Role Name: %s", self.role_name)
+return glue_execution_role
+except Exception as general_error:
+self.log.error(f'Failed to create aws glue job, error: 
{str(general_error)}')
+raise
+
+def initialize_job(self, script_arguments=None):
+"""
+Initializes connection with AWS Glue
+to run job
+:return:
+"""
+if self.s3_bucket is None:
+raise AirflowException(
+'Could not initialize glue job, '
+'error: Specify Parameter `s3_bucket`'
+)
+
+glue_client = self.get_conn()
+
+try:
+job_response = self.get_or_create_glue_job()
+job_name = job_response
+job_run = glue_client.start_job_run(
+JobName=job_name,
+Arguments=script_arguments
+)
+return self.job_completion(job_name, job_run['JobRunId'])
+except Exception as general_error:
+self.log.error(f'Failed to run aws glue job, error: 
{str(general_error)}')
+  

[GitHub] [airflow] abdulbasitds commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration

2020-04-02 Thread GitBox
abdulbasitds commented on a change in pull request #6007: [AIRFLOW-2310] Enable 
AWS Glue Job Integration
URL: https://github.com/apache/airflow/pull/6007#discussion_r402336526
 
 

 ##
 File path: airflow/providers/amazon/aws/hooks/glue.py
 ##
 @@ -0,0 +1,214 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+import os.path
+import time
+
+
+class AwsGlueJobHook(AwsBaseHook):
+"""
+Interact with AWS Glue - create job, trigger, crawler
+
+:param job_name: unique job name per AWS account
+:type job_name: str
+:param desc: job description
+:type desc: str
+:param concurrent_run_limit: The maximum number of concurrent runs allowed 
for a job
+:type concurrent_run_limit: int
+:param script_location: path to etl script either on s3 or local
+:type script_location: str
+:param conns: A list of connections used by the job
+:type conns: list
+:param retry_limit: Maximum number of times to retry this job if it fails
+:type retry_limit: int
+:param num_of_dpus: Number of AWS Glue DPUs to allocate to this Job
+:type num_of_dpus: int
+:param region_name: aws region name (example: us-east-1)
+:type region_name: str
+:param iam_role_name: AWS IAM Role for Glue Job
+:type iam_role_name: str
+:param s3_bucket: S3 bucket where logs and local etl script will be 
uploaded
+:type s3_bucket: str
+"""
+
+def __init__(self,
+ job_name=None,
+ desc=None,
+ concurrent_run_limit=1,
+ script_location=None,
+ conns=None,
+ retry_limit=0,
+ num_of_dpus=10,
+ aws_conn_id='aws_default',
+ region_name=None,
+ iam_role_name=None,
+ s3_bucket=None, *args, **kwargs):
+self.job_name = job_name
+self.desc = desc
+self.concurrent_run_limit = concurrent_run_limit
+self.script_location = script_location
+self.conns = conns or ["s3"]
+self.retry_limit = retry_limit
+self.num_of_dpus = num_of_dpus
+self.aws_conn_id = aws_conn_id
+self.region_name = region_name
+self.s3_bucket = s3_bucket
+self.role_name = iam_role_name
+self.S3_PROTOCOL = "s3://"
+self.S3_ARTIFACTS_PREFIX = 'artifacts/glue-scripts/'
+self.S3_GLUE_LOGS = 'logs/glue-logs/'
+super(AwsGlueJobHook, self).__init__(*args, **kwargs)
+
+def get_conn(self):
+conn = self.get_client_type('glue', self.region_name)
+return conn
+
+def list_jobs(self):
+conn = self.get_conn()
+return conn.get_jobs()
+
+def get_iam_execution_role(self):
+"""
+:return: iam role for job execution
+"""
+iam_client = self.get_client_type('iam', self.region_name)
+
+try:
+glue_execution_role = iam_client.get_role(RoleName=self.role_name)
+self.log.info("Iam Role Name: %s", self.role_name)
+return glue_execution_role
+except Exception as general_error:
+self.log.error(f'Failed to create aws glue job, error: 
{str(general_error)}')
+raise
 
 Review comment:
   "But if you cannot make it work with the functions in AwsBaseHook you can 
access it for now like you do." with boto object. as it is right now?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] abdulbasitds commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration

2020-04-02 Thread GitBox
abdulbasitds commented on a change in pull request #6007: [AIRFLOW-2310] Enable 
AWS Glue Job Integration
URL: https://github.com/apache/airflow/pull/6007#discussion_r402390226
 
 

 ##
 File path: airflow/providers/amazon/aws/hooks/glue.py
 ##
 @@ -0,0 +1,214 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+import os.path
+import time
+
+
+class AwsGlueJobHook(AwsBaseHook):
+"""
+Interact with AWS Glue - create job, trigger, crawler
+
+:param job_name: unique job name per AWS account
+:type job_name: str
+:param desc: job description
+:type desc: str
+:param concurrent_run_limit: The maximum number of concurrent runs allowed 
for a job
+:type concurrent_run_limit: int
+:param script_location: path to etl script either on s3 or local
+:type script_location: str
+:param conns: A list of connections used by the job
+:type conns: list
+:param retry_limit: Maximum number of times to retry this job if it fails
+:type retry_limit: int
+:param num_of_dpus: Number of AWS Glue DPUs to allocate to this Job
+:type num_of_dpus: int
+:param region_name: aws region name (example: us-east-1)
+:type region_name: str
+:param iam_role_name: AWS IAM Role for Glue Job
+:type iam_role_name: str
+:param s3_bucket: S3 bucket where logs and local etl script will be 
uploaded
+:type s3_bucket: str
+"""
+
+def __init__(self,
+ job_name=None,
+ desc=None,
+ concurrent_run_limit=1,
+ script_location=None,
+ conns=None,
+ retry_limit=0,
+ num_of_dpus=10,
+ aws_conn_id='aws_default',
+ region_name=None,
+ iam_role_name=None,
+ s3_bucket=None, *args, **kwargs):
+self.job_name = job_name
+self.desc = desc
+self.concurrent_run_limit = concurrent_run_limit
+self.script_location = script_location
+self.conns = conns or ["s3"]
+self.retry_limit = retry_limit
+self.num_of_dpus = num_of_dpus
+self.aws_conn_id = aws_conn_id
+self.region_name = region_name
+self.s3_bucket = s3_bucket
+self.role_name = iam_role_name
+self.S3_PROTOCOL = "s3://"
+self.S3_ARTIFACTS_PREFIX = 'artifacts/glue-scripts/'
+self.S3_GLUE_LOGS = 'logs/glue-logs/'
+super(AwsGlueJobHook, self).__init__(*args, **kwargs)
+
+def get_conn(self):
+conn = self.get_client_type('glue', self.region_name)
+return conn
+
+def list_jobs(self):
+conn = self.get_conn()
+return conn.get_jobs()
+
+def get_iam_execution_role(self):
+"""
+:return: iam role for job execution
+"""
+iam_client = self.get_client_type('iam', self.region_name)
+
+try:
+glue_execution_role = iam_client.get_role(RoleName=self.role_name)
+self.log.info("Iam Role Name: %s", self.role_name)
+return glue_execution_role
+except Exception as general_error:
+self.log.error(f'Failed to create aws glue job, error: 
{str(general_error)}')
+raise
+
+def initialize_job(self, script_arguments=None):
+"""
+Initializes connection with AWS Glue
+to run job
+:return:
+"""
+if self.s3_bucket is None:
+raise AirflowException(
+'Could not initialize glue job, '
+'error: Specify Parameter `s3_bucket`'
+)
+
+glue_client = self.get_conn()
+
+try:
+job_response = self.get_or_create_glue_job()
+job_name = job_response
+job_run = glue_client.start_job_run(
+JobName=job_name,
+Arguments=script_arguments
+)
+return self.job_completion(job_name, job_run['JobRunId'])
+except Exception as general_error:
+self.log.error(f'Failed to run aws glue job, error: 
{str(general_error)}')
+  

[GitHub] [airflow] abdulbasitds commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration

2020-04-02 Thread GitBox
abdulbasitds commented on a change in pull request #6007: [AIRFLOW-2310] Enable 
AWS Glue Job Integration
URL: https://github.com/apache/airflow/pull/6007#discussion_r402432506
 
 

 ##
 File path: airflow/providers/amazon/aws/hooks/glue.py
 ##
 @@ -0,0 +1,172 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+import time
+
+
+class AwsGlueJobHook(AwsBaseHook):
+"""
+Interact with AWS Glue - create job, trigger, crawler
+
+:param job_name: unique job name per AWS account
+:type job_name: Optional[str]
+:param desc: job description
+:type desc: Optional[str]
+:param concurrent_run_limit: The maximum number of concurrent runs allowed 
for a job
+:type concurrent_run_limit: int
+:param script_location: path to etl script either on s3 or local
 
 Review comment:
   @feluelle But arent we still uploading the file to S3 if it is not. So we 
have both the options
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] abdulbasitds commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration

2020-05-07 Thread GitBox


abdulbasitds commented on a change in pull request #6007:
URL: https://github.com/apache/airflow/pull/6007#discussion_r421971459



##
File path: airflow/providers/amazon/aws/operators/glue.py
##
@@ -0,0 +1,117 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import unicode_literals
+
+from airflow.providers.amazon.aws.hooks.glue import AwsGlueJobHook
+from airflow.providers.amazon.aws.hooks.s3 import S3Hook
+from airflow.models import BaseOperator
+from airflow.utils.decorators import apply_defaults
+import os.path
+
+
+class AWSGlueJobOperator(BaseOperator):
+"""
+Creates an AWS Glue Job. AWS Glue is a serverless Spark
+ETL service for running Spark Jobs on the AWS cloud.
+Language support: Python and Scala
+
+:param job_name: unique job name per AWS Account
+:type job_name: Optional[str]
+:param script_location: location of ETL script. Must be a local or S3 path
+:type script_location: Optional[str]
+:param job_desc: job description details
+:type job_desc: Optional[str]
+:param concurrent_run_limit: The maximum number of concurrent runs allowed 
for a job
+:type concurrent_run_limit: Optional[int]
+:param script_args: etl script arguments and AWS Glue arguments
+:type script_args: dict
+:param connections: AWS Glue connections to be used by the job.
+:type connections: list
+:param retry_limit: The maximum number of times to retry this job if it 
fails
+:type retry_limit:Optional[int]
+:param num_of_dpus: Number of AWS Glue DPUs to allocate to this Job.
+:type num_of_dpus: int
+:param region_name: aws region name (example: us-east-1)
+:type region_name: str
+:param s3_bucket: S3 bucket where logs and local etl script will be 
uploaded
+:type s3_bucket: Optional[str]
+:param iam_role_name: AWS IAM Role for Glue Job Execution
+:type iam_role_name: Optional[str]
+"""
+template_fields = ()
+template_ext = ()
+ui_color = '#ededed'
+
+@apply_defaults
+def __init__(self,
+ job_name='aws_glue_default_job',
+ job_desc='AWS Glue Job with Airflow',
+ script_location=None,
+ concurrent_run_limit=None,
+ script_args=None,
+ connections=None,
+ retry_limit=None,
+ num_of_dpus=6,
+ aws_conn_id='aws_default',
+ region_name=None,
+ s3_bucket=None,
+ iam_role_name=None,
+ *args, **kwargs
+ ):
+super(AWSGlueJobOperator, self).__init__(*args, **kwargs)
+self.job_name = job_name
+self.job_desc = job_desc
+self.script_location = script_location
+self.concurrent_run_limit = concurrent_run_limit
+self.script_args = script_args or {}
+self.connections = connections or []
+self.retry_limit = retry_limit
+self.num_of_dpus = num_of_dpus
+self.aws_conn_id = aws_conn_id,
+self.region_name = region_name
+self.s3_bucket = s3_bucket
+self.iam_role_name = iam_role_name
+self.S3_PROTOCOL = "s3://"
+self.S3_ARTIFACTS_PREFIX = 'artifacts/glue-scripts/'
+
+def execute(self, context):
+"""
+Executes AWS Glue Job from Airflow
+
+:return: the id of the current glue job.
+"""
+if not self.script_location.startsWith(self.S3_PROTOCOL):

Review comment:
   @zachliu
   I have made this change. 
   @feluelle  scrpt_location is already Optional[str]





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] abdulbasitds commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration

2020-05-09 Thread GitBox


abdulbasitds commented on a change in pull request #6007:
URL: https://github.com/apache/airflow/pull/6007#discussion_r42252



##
File path: tests/providers/amazon/aws/hooks/test_aws_glue_job_hook.py
##
@@ -0,0 +1,107 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# TODO: This license is not consistent with license used in the project.
+#   Delete the inconsistent license and above line and rerun pre-commit to 
insert a good license.
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# http://www.apache.org/licenses/LICENSE-2.0
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.

Review comment:
   Hello @kaxil, thankyou for your comment and modifications. really 
helpful.
   One question, I did run pre-commit run but this wasnt automatically change, 
am i doing something wrong?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] abdulbasitds commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration

2020-05-10 Thread GitBox


abdulbasitds commented on a change in pull request #6007:
URL: https://github.com/apache/airflow/pull/6007#discussion_r422661617



##
File path: airflow/providers/amazon/aws/hooks/glue.py
##
@@ -0,0 +1,180 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import time
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+
+
+class AwsGlueJobHook(AwsBaseHook):
+"""
+Interact with AWS Glue - create job, trigger, crawler
+
+:param job_name: unique job name per AWS account
+:type job_name: Optional[str]
+:param desc: job description
+:type desc: Optional[str]
+:param concurrent_run_limit: The maximum number of concurrent runs allowed 
for a job
+:type concurrent_run_limit: int
+:param script_location: path to etl script on s3
+:type script_location: Optional[str]
+:param retry_limit: Maximum number of times to retry this job if it fails
+:type retry_limit: int
+:param num_of_dpus: Number of AWS Glue DPUs to allocate to this Job
+:type num_of_dpus: int
+:param region_name: aws region name (example: us-east-1)
+:type region_name: Optional[str]
+:param iam_role_name: AWS IAM Role for Glue Job
+:type iam_role_name: Optional[str]
+:param s3_bucket: S3 bucket where logs and local etl script will be 
uploaded
+:type s3_bucket: Optional[str]
+"""
+JOB_POLL_INTERVAL = 6  # polls job status after every JOB_POLL_INTERVAL 
seconds
+
+def __init__(self,
+ s3_bucket=None,
+ job_name=None,
+ desc=None,
+ concurrent_run_limit=1,
+ script_location=None,
+ retry_limit=0,
+ num_of_dpus=10,
+ region_name=None,
+ iam_role_name=None,
+ *args, **kwargs):
+self.job_name = job_name
+self.desc = desc
+self.concurrent_run_limit = concurrent_run_limit
+self.script_location = script_location
+self.retry_limit = retry_limit
+self.num_of_dpus = num_of_dpus
+self.region_name = region_name
+self.s3_bucket = s3_bucket
+self.role_name = iam_role_name
+self.s3_glue_logs = 'logs/glue-logs/'
+super(AwsGlueJobHook, self).__init__(client_type='glue', *args, 
**kwargs)
+
+def get_conn(self):
+"""
+:return: connection
+"""
+conn = self.get_client_type('glue', self.region_name)
+return conn

Review comment:
   @feluelle Okay, i will push it with other changes(about the test that is 
failing). currently trying to figure it out.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] abdulbasitds commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration

2020-05-10 Thread GitBox


abdulbasitds commented on a change in pull request #6007:
URL: https://github.com/apache/airflow/pull/6007#discussion_r422661617



##
File path: airflow/providers/amazon/aws/hooks/glue.py
##
@@ -0,0 +1,180 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import time
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+
+
+class AwsGlueJobHook(AwsBaseHook):
+"""
+Interact with AWS Glue - create job, trigger, crawler
+
+:param job_name: unique job name per AWS account
+:type job_name: Optional[str]
+:param desc: job description
+:type desc: Optional[str]
+:param concurrent_run_limit: The maximum number of concurrent runs allowed 
for a job
+:type concurrent_run_limit: int
+:param script_location: path to etl script on s3
+:type script_location: Optional[str]
+:param retry_limit: Maximum number of times to retry this job if it fails
+:type retry_limit: int
+:param num_of_dpus: Number of AWS Glue DPUs to allocate to this Job
+:type num_of_dpus: int
+:param region_name: aws region name (example: us-east-1)
+:type region_name: Optional[str]
+:param iam_role_name: AWS IAM Role for Glue Job
+:type iam_role_name: Optional[str]
+:param s3_bucket: S3 bucket where logs and local etl script will be 
uploaded
+:type s3_bucket: Optional[str]
+"""
+JOB_POLL_INTERVAL = 6  # polls job status after every JOB_POLL_INTERVAL 
seconds
+
+def __init__(self,
+ s3_bucket=None,
+ job_name=None,
+ desc=None,
+ concurrent_run_limit=1,
+ script_location=None,
+ retry_limit=0,
+ num_of_dpus=10,
+ region_name=None,
+ iam_role_name=None,
+ *args, **kwargs):
+self.job_name = job_name
+self.desc = desc
+self.concurrent_run_limit = concurrent_run_limit
+self.script_location = script_location
+self.retry_limit = retry_limit
+self.num_of_dpus = num_of_dpus
+self.region_name = region_name
+self.s3_bucket = s3_bucket
+self.role_name = iam_role_name
+self.s3_glue_logs = 'logs/glue-logs/'
+super(AwsGlueJobHook, self).__init__(client_type='glue', *args, 
**kwargs)
+
+def get_conn(self):
+"""
+:return: connection
+"""
+conn = self.get_client_type('glue', self.region_name)
+return conn

Review comment:
   @feluelle Okay, i will push it with other changes(about the test that is 
failing). currently trying to figure it out.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org