Repository: incubator-airflow Updated Branches: refs/heads/master a920035a9 -> f3af6f44e
[AIRFLOW-96] s3_conn_id using environment variable Dear Airflow Maintainers, Please accept this PR that addresses the following issues: - [AIRFLOW-96](https://issues.apache.org/jira/brow se/AIRFLOW-96) : allow parameter "s3_conn_id" of S3KeySensor and S3PrefixSensor to be defined using an environment variable. Actually, S3KeySensor and S3PrefixSensor use the S3hook, which extends BaseHook. BaseHook has get_connection, which looks a connection up : - in environment variables first - and then in the database Closes #1517 from dm-tran/fix-jira-airflow-96 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/f3af6f44 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/f3af6f44 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/f3af6f44 Branch: refs/heads/master Commit: f3af6f44eb4bb3fe0994d42ecc046abb314e4d63 Parents: a920035 Author: Duy-Minh TRAN <duyminh.t...@adotmob.com> Authored: Thu Oct 20 21:30:34 2016 +0530 Committer: Sumit Maheshwari <sum...@qubole.com> Committed: Thu Oct 20 21:30:34 2016 +0530 ---------------------------------------------------------------------- airflow/operators/sensors.py | 14 +------------- docs/concepts.rst | 2 +- 2 files changed, 2 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f3af6f44/airflow/operators/sensors.py ---------------------------------------------------------------------- diff --git a/airflow/operators/sensors.py b/airflow/operators/sensors.py index 570b682..58040bc 100644 --- a/airflow/operators/sensors.py +++ b/airflow/operators/sensors.py @@ -26,7 +26,7 @@ from time import sleep import airflow from airflow import hooks, settings from airflow.exceptions import AirflowException, AirflowSensorTimeout, AirflowSkipException -from airflow.models import BaseOperator, TaskInstance, Connection as DB +from airflow.models import BaseOperator, TaskInstance from airflow.hooks.base_hook import BaseHook from airflow.utils.state import State from airflow.utils.decorators import apply_defaults @@ -446,10 +446,6 @@ class S3KeySensor(BaseSensorOperator): s3_conn_id='s3_default', *args, **kwargs): super(S3KeySensor, self).__init__(*args, **kwargs) - session = settings.Session() - db = session.query(DB).filter(DB.conn_id == s3_conn_id).first() - if not db: - raise AirflowException("conn_id doesn't exist in the repository") # Parse if bucket_name is None: parsed_url = urlparse(bucket_key) @@ -465,8 +461,6 @@ class S3KeySensor(BaseSensorOperator): self.bucket_key = bucket_key self.wildcard_match = wildcard_match self.s3_conn_id = s3_conn_id - session.commit() - session.close() def poke(self, context): import airflow.hooks.S3_hook @@ -506,18 +500,12 @@ class S3PrefixSensor(BaseSensorOperator): s3_conn_id='s3_default', *args, **kwargs): super(S3PrefixSensor, self).__init__(*args, **kwargs) - session = settings.Session() - db = session.query(DB).filter(DB.conn_id == s3_conn_id).first() - if not db: - raise AirflowException("conn_id doesn't exist in the repository") # Parse self.bucket_name = bucket_name self.prefix = prefix self.delimiter = delimiter self.full_url = "s3://" + bucket_name + '/' + prefix self.s3_conn_id = s3_conn_id - session.commit() - session.close() def poke(self, context): logging.info('Poking for prefix : {self.prefix}\n' http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f3af6f44/docs/concepts.rst ---------------------------------------------------------------------- diff --git a/docs/concepts.rst b/docs/concepts.rst index e27de26..9f65256 100644 --- a/docs/concepts.rst +++ b/docs/concepts.rst @@ -333,7 +333,7 @@ is named ``postgres_master`` the environment variable should be named ``AIRFLOW_CONN_POSTGRES_MASTER`` (note that the environment variable must be all uppercase). Airflow assumes the value returned from the environment variable to be in a URI format (e.g. -``postgres://user:password@localhost:5432/master``). +``postgres://user:password@localhost:5432/master`` or ``s3://accesskey:secretkey@S3``). Queues ======