http://git-wip-us.apache.org/repos/asf/incubator-airflow-site/blob/11437c14/_modules/airflow/operators/sensors.html ---------------------------------------------------------------------- diff --git a/_modules/airflow/operators/sensors.html b/_modules/airflow/operators/sensors.html deleted file mode 100644 index 7414daf..0000000 --- a/_modules/airflow/operators/sensors.html +++ /dev/null @@ -1,929 +0,0 @@ - - -<!DOCTYPE html> -<!--[if IE 8]><html class="no-js lt-ie9" lang="en" > <![endif]--> -<!--[if gt IE 8]><!--> <html class="no-js" lang="en" > <!--<![endif]--> -<head> - <meta charset="utf-8"> - - <meta name="viewport" content="width=device-width, initial-scale=1.0"> - - <title>airflow.operators.sensors — Airflow Documentation</title> - - - - - - - - - - - - - - - - - - <link rel="stylesheet" href="../../../_static/css/theme.css" type="text/css" /> - - - - - - <link rel="index" title="Index" - href="../../../genindex.html"/> - <link rel="search" title="Search" href="../../../search.html"/> - <link rel="top" title="Airflow Documentation" href="../../../index.html"/> - <link rel="up" title="Module code" href="../../index.html"/> - - - <script src="../../../_static/js/modernizr.min.js"></script> - -</head> - -<body class="wy-body-for-nav" role="document"> - - - <div class="wy-grid-for-nav"> - - - <nav data-toggle="wy-nav-shift" class="wy-nav-side"> - <div class="wy-side-scroll"> - <div class="wy-side-nav-search"> - - - - <a href="../../../index.html" class="icon icon-home"> Airflow - - - - </a> - - - - - - - -<div role="search"> - <form id="rtd-search-form" class="wy-form" action="../../../search.html" method="get"> - <input type="text" name="q" placeholder="Search docs" /> - <input type="hidden" name="check_keywords" value="yes" /> - <input type="hidden" name="area" value="default" /> - </form> -</div> - - - </div> - - <div class="wy-menu wy-menu-vertical" data-spy="affix" role="navigation" aria-label="main navigation"> - - - - - - - <ul> -<li class="toctree-l1"><a class="reference internal" href="../../../project.html">Project</a></li> -<li class="toctree-l1"><a class="reference internal" href="../../../license.html">License</a></li> -<li class="toctree-l1"><a class="reference internal" href="../../../start.html">Quick Start</a></li> -<li class="toctree-l1"><a class="reference internal" href="../../../installation.html">Installation</a></li> -<li class="toctree-l1"><a class="reference internal" href="../../../tutorial.html">Tutorial</a></li> -<li class="toctree-l1"><a class="reference internal" href="../../../configuration.html">Configuration</a></li> -<li class="toctree-l1"><a class="reference internal" href="../../../ui.html">UI / Screenshots</a></li> -<li class="toctree-l1"><a class="reference internal" href="../../../concepts.html">Concepts</a></li> -<li class="toctree-l1"><a class="reference internal" href="../../../profiling.html">Data Profiling</a></li> -<li class="toctree-l1"><a class="reference internal" href="../../../cli.html">Command Line Interface</a></li> -<li class="toctree-l1"><a class="reference internal" href="../../../scheduler.html">Scheduling & Triggers</a></li> -<li class="toctree-l1"><a class="reference internal" href="../../../plugins.html">Plugins</a></li> -<li class="toctree-l1"><a class="reference internal" href="../../../security.html">Security</a></li> -<li class="toctree-l1"><a class="reference internal" href="../../../api.html">Experimental Rest API</a></li> -<li class="toctree-l1"><a class="reference internal" href="../../../integration.html">Integration</a></li> -<li class="toctree-l1"><a class="reference internal" href="../../../faq.html">FAQ</a></li> -<li class="toctree-l1"><a class="reference internal" href="../../../code.html">API Reference</a></li> -</ul> - - - - </div> - </div> - </nav> - - <section data-toggle="wy-nav-shift" class="wy-nav-content-wrap"> - - - <nav class="wy-nav-top" role="navigation" aria-label="top navigation"> - - <i data-toggle="wy-nav-top" class="fa fa-bars"></i> - <a href="../../../index.html">Airflow</a> - - </nav> - - - - <div class="wy-nav-content"> - <div class="rst-content"> - - - - - - - - - - - - - - - - -<div role="navigation" aria-label="breadcrumbs navigation"> - - <ul class="wy-breadcrumbs"> - - <li><a href="../../../index.html">Docs</a> »</li> - - <li><a href="../../index.html">Module code</a> »</li> - - <li>airflow.operators.sensors</li> - - - <li class="wy-breadcrumbs-aside"> - - - - </li> - - </ul> - - - <hr/> -</div> - <div role="main" class="document" itemscope="itemscope" itemtype="http://schema.org/Article"> - <div itemprop="articleBody"> - - <h1>Source code for airflow.operators.sensors</h1><div class="highlight"><pre> -<span></span><span class="c1"># -*- coding: utf-8 -*-</span> -<span class="c1">#</span> -<span class="c1"># Licensed under the Apache License, Version 2.0 (the "License");</span> -<span class="c1"># you may not use this file except in compliance with the License.</span> -<span class="c1"># You may obtain a copy of the License at</span> -<span class="c1">#</span> -<span class="c1"># http://www.apache.org/licenses/LICENSE-2.0</span> -<span class="c1">#</span> -<span class="c1"># Unless required by applicable law or agreed to in writing, software</span> -<span class="c1"># distributed under the License is distributed on an "AS IS" BASIS,</span> -<span class="c1"># WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.</span> -<span class="c1"># See the License for the specific language governing permissions and</span> -<span class="c1"># limitations under the License.</span> - -<span class="kn">from</span> <span class="nn">__future__</span> <span class="k">import</span> <span class="n">print_function</span> -<span class="kn">from</span> <span class="nn">future</span> <span class="k">import</span> <span class="n">standard_library</span> - -<span class="kn">from</span> <span class="nn">airflow.utils.log.logging_mixin</span> <span class="k">import</span> <span class="n">LoggingMixin</span> - -<span class="n">standard_library</span><span class="o">.</span><span class="n">install_aliases</span><span class="p">()</span> -<span class="kn">from</span> <span class="nn">builtins</span> <span class="k">import</span> <span class="nb">str</span> -<span class="kn">from</span> <span class="nn">past.builtins</span> <span class="k">import</span> <span class="n">basestring</span> - -<span class="kn">from</span> <span class="nn">datetime</span> <span class="k">import</span> <span class="n">datetime</span> -<span class="kn">from</span> <span class="nn">urllib.parse</span> <span class="k">import</span> <span class="n">urlparse</span> -<span class="kn">from</span> <span class="nn">time</span> <span class="k">import</span> <span class="n">sleep</span> -<span class="kn">import</span> <span class="nn">re</span> -<span class="kn">import</span> <span class="nn">sys</span> - -<span class="kn">from</span> <span class="nn">airflow</span> <span class="k">import</span> <span class="n">settings</span> -<span class="kn">from</span> <span class="nn">airflow.exceptions</span> <span class="k">import</span> <span class="n">AirflowException</span><span class="p">,</span> <span class="n">AirflowSensorTimeout</span><span class="p">,</span> <span class="n">AirflowSkipException</span> -<span class="kn">from</span> <span class="nn">airflow.models</span> <span class="k">import</span> <span class="n">BaseOperator</span><span class="p">,</span> <span class="n">TaskInstance</span> -<span class="kn">from</span> <span class="nn">airflow.hooks.base_hook</span> <span class="k">import</span> <span class="n">BaseHook</span> -<span class="kn">from</span> <span class="nn">airflow.hooks.hdfs_hook</span> <span class="k">import</span> <span class="n">HDFSHook</span> -<span class="kn">from</span> <span class="nn">airflow.hooks.http_hook</span> <span class="k">import</span> <span class="n">HttpHook</span> -<span class="kn">from</span> <span class="nn">airflow.utils.state</span> <span class="k">import</span> <span class="n">State</span> -<span class="kn">from</span> <span class="nn">airflow.utils.decorators</span> <span class="k">import</span> <span class="n">apply_defaults</span> - - -<div class="viewcode-block" id="BaseSensorOperator"><a class="viewcode-back" href="../../../code.html#airflow.operators.sensors.BaseSensorOperator">[docs]</a><span class="k">class</span> <span class="nc">BaseSensorOperator</span><span class="p">(</span><span class="n">BaseOperator</span><span class="p">):</span> - <span class="sd">'''</span> -<span class="sd"> Sensor operators are derived from this class an inherit these attributes.</span> - -<span class="sd"> Sensor operators keep executing at a time interval and succeed when</span> -<span class="sd"> a criteria is met and fail if and when they time out.</span> - -<span class="sd"> :param soft_fail: Set to true to mark the task as SKIPPED on failure</span> -<span class="sd"> :type soft_fail: bool</span> -<span class="sd"> :param poke_interval: Time in seconds that the job should wait in</span> -<span class="sd"> between each tries</span> -<span class="sd"> :type poke_interval: int</span> -<span class="sd"> :param timeout: Time, in seconds before the task times out and fails.</span> -<span class="sd"> :type timeout: int</span> -<span class="sd"> '''</span> - <span class="n">ui_color</span> <span class="o">=</span> <span class="s1">'#e6f1f2'</span> - - <span class="nd">@apply_defaults</span> - <span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span> - <span class="bp">self</span><span class="p">,</span> - <span class="n">poke_interval</span><span class="o">=</span><span class="mi">60</span><span class="p">,</span> - <span class="n">timeout</span><span class="o">=</span><span class="mi">60</span><span class="o">*</span><span class="mi">60</span><span class="o">*</span><span class="mi">24</span><span class="o">*</span><span class="mi">7</span><span class="p">,</span> - <span class="n">soft_fail</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> - <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span> - <span class="nb">super</span><span class="p">(</span><span class="n">BaseSensorOperator</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="fm">__init__</span><span class="p">(</span><span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span> - <span class="bp">self</span><span class="o">.</span><span class="n">poke_interval</span> <span class="o">=</span> <span class="n">poke_interval</span> - <span class="bp">self</span><span class="o">.</span><span class="n">soft_fail</span> <span class="o">=</span> <span class="n">soft_fail</span> - <span class="bp">self</span><span class="o">.</span><span class="n">timeout</span> <span class="o">=</span> <span class="n">timeout</span> - - <span class="k">def</span> <span class="nf">poke</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">context</span><span class="p">):</span> - <span class="sd">'''</span> -<span class="sd"> Function that the sensors defined while deriving this class should</span> -<span class="sd"> override.</span> -<span class="sd"> '''</span> - <span class="k">raise</span> <span class="n">AirflowException</span><span class="p">(</span><span class="s1">'Override me.'</span><span class="p">)</span> - - <span class="k">def</span> <span class="nf">execute</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">context</span><span class="p">):</span> - <span class="n">started_at</span> <span class="o">=</span> <span class="n">datetime</span><span class="o">.</span><span class="n">utcnow</span><span class="p">()</span> - <span class="k">while</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">poke</span><span class="p">(</span><span class="n">context</span><span class="p">):</span> - <span class="k">if</span> <span class="p">(</span><span class="n">datetime</span><span class="o">.</span><span class="n">utcnow</span><span class="p">()</span> <span class="o">-</span> <span class="n">started_at</span><span class="p">)</span><span class="o">.</span><span class="n">total_seconds</span><span class="p">()</span> <span class="o">></span> <span class="bp">self</span><span class="o">.</span><span class="n">timeout</span><span class="p">:</span> - <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">soft_fail</span><span class="p">:</span> - <span class="k">raise</span> <span class="n">AirflowSkipException</span><span class="p">(</span><span class="s1">'Snap. Time is OUT.'</span><span class="p">)</span> - <span class="k">else</span><span class="p">:</span> - <span class="k">raise</span> <span class="n">AirflowSensorTimeout</span><span class="p">(</span><span class="s1">'Snap. Time is OUT.'</span><span class="p">)</span> - <span class="n">sleep</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">poke_interval</span><span class="p">)</span> - <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">"Success criteria met. Exiting."</span><span class="p">)</span></div> - - -<span class="k">class</span> <span class="nc">SqlSensor</span><span class="p">(</span><span class="n">BaseSensorOperator</span><span class="p">):</span> - <span class="sd">"""</span> -<span class="sd"> Runs a sql statement until a criteria is met. It will keep trying while</span> -<span class="sd"> sql returns no row, or if the first cell in (0, '0', '').</span> - -<span class="sd"> :param conn_id: The connection to run the sensor against</span> -<span class="sd"> :type conn_id: string</span> -<span class="sd"> :param sql: The sql to run. To pass, it needs to return at least one cell</span> -<span class="sd"> that contains a non-zero / empty string value.</span> -<span class="sd"> """</span> - <span class="n">template_fields</span> <span class="o">=</span> <span class="p">(</span><span class="s1">'sql'</span><span class="p">,)</span> - <span class="n">template_ext</span> <span class="o">=</span> <span class="p">(</span><span class="s1">'.hql'</span><span class="p">,</span> <span class="s1">'.sql'</span><span class="p">,)</span> - <span class="n">ui_color</span> <span class="o">=</span> <span class="s1">'#7c7287'</span> - - <span class="nd">@apply_defaults</span> - <span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">conn_id</span><span class="p">,</span> <span class="n">sql</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span> - <span class="bp">self</span><span class="o">.</span><span class="n">sql</span> <span class="o">=</span> <span class="n">sql</span> - <span class="bp">self</span><span class="o">.</span><span class="n">conn_id</span> <span class="o">=</span> <span class="n">conn_id</span> - <span class="nb">super</span><span class="p">(</span><span class="n">SqlSensor</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="fm">__init__</span><span class="p">(</span><span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span> - - <span class="k">def</span> <span class="nf">poke</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">context</span><span class="p">):</span> - <span class="n">hook</span> <span class="o">=</span> <span class="n">BaseHook</span><span class="o">.</span><span class="n">get_connection</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">conn_id</span><span class="p">)</span><span class="o">.</span><span class="n">get_hook</span><span class="p">()</span> - - <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s1">'Poking: </span><span class="si">%s</span><span class="s1">'</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">sql</span><span class="p">)</span> - <span class="n">records</span> <span class="o">=</span> <span class="n">hook</span><span class="o">.</span><span class="n">get_records</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">sql</span><span class="p">)</span> - <span class="k">if</span> <span class="ow">not</span> <span class="n">records</span><span class="p">:</span> - <span class="k">return</span> <span class="kc">False</span> - <span class="k">else</span><span class="p">:</span> - <span class="k">if</span> <span class="nb">str</span><span class="p">(</span><span class="n">records</span><span class="p">[</span><span class="mi">0</span><span class="p">][</span><span class="mi">0</span><span class="p">])</span> <span class="ow">in</span> <span class="p">(</span><span class="s1">'0'</span><span class="p">,</span> <span class="s1">''</span><span class="p">,):</span> - <span class="k">return</span> <span class="kc">False</span> - <span class="k">else</span><span class="p">:</span> - <span class="k">return</span> <span class="kc">True</span> - - -<span class="k">class</span> <span class="nc">MetastorePartitionSensor</span><span class="p">(</span><span class="n">SqlSensor</span><span class="p">):</span> - <span class="sd">"""</span> -<span class="sd"> An alternative to the HivePartitionSensor that talk directly to the</span> -<span class="sd"> MySQL db. This was created as a result of observing sub optimal</span> -<span class="sd"> queries generated by the Metastore thrift service when hitting</span> -<span class="sd"> subpartitioned tables. The Thrift service's queries were written in a</span> -<span class="sd"> way that wouldn't leverage the indexes.</span> - -<span class="sd"> :param schema: the schema</span> -<span class="sd"> :type schema: str</span> -<span class="sd"> :param table: the table</span> -<span class="sd"> :type table: str</span> -<span class="sd"> :param partition_name: the partition name, as defined in the PARTITIONS</span> -<span class="sd"> table of the Metastore. Order of the fields does matter.</span> -<span class="sd"> Examples: ``ds=2016-01-01`` or</span> -<span class="sd"> ``ds=2016-01-01/sub=foo`` for a sub partitioned table</span> -<span class="sd"> :type partition_name: str</span> -<span class="sd"> :param mysql_conn_id: a reference to the MySQL conn_id for the metastore</span> -<span class="sd"> :type mysql_conn_id: str</span> -<span class="sd"> """</span> - <span class="n">template_fields</span> <span class="o">=</span> <span class="p">(</span><span class="s1">'partition_name'</span><span class="p">,</span> <span class="s1">'table'</span><span class="p">,</span> <span class="s1">'schema'</span><span class="p">)</span> - <span class="n">ui_color</span> <span class="o">=</span> <span class="s1">'#8da7be'</span> - - <span class="nd">@apply_defaults</span> - <span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span> - <span class="bp">self</span><span class="p">,</span> <span class="n">table</span><span class="p">,</span> <span class="n">partition_name</span><span class="p">,</span> <span class="n">schema</span><span class="o">=</span><span class="s2">"default"</span><span class="p">,</span> - <span class="n">mysql_conn_id</span><span class="o">=</span><span class="s2">"metastore_mysql"</span><span class="p">,</span> - <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span> - - <span class="bp">self</span><span class="o">.</span><span class="n">partition_name</span> <span class="o">=</span> <span class="n">partition_name</span> - <span class="bp">self</span><span class="o">.</span><span class="n">table</span> <span class="o">=</span> <span class="n">table</span> - <span class="bp">self</span><span class="o">.</span><span class="n">schema</span> <span class="o">=</span> <span class="n">schema</span> - <span class="bp">self</span><span class="o">.</span><span class="n">first_poke</span> <span class="o">=</span> <span class="kc">True</span> - <span class="bp">self</span><span class="o">.</span><span class="n">conn_id</span> <span class="o">=</span> <span class="n">mysql_conn_id</span> - <span class="c1"># TODO(aoen): We shouldn't be using SqlSensor here but MetastorePartitionSensor.</span> - <span class="c1"># The problem is the way apply_defaults works isn't compatible with inheritance.</span> - <span class="c1"># The inheritance model needs to be reworked in order to support overriding args/</span> - <span class="c1"># kwargs with arguments here, then 'conn_id' and 'sql' can be passed into the</span> - <span class="c1"># constructor below and apply_defaults will no longer throw an exception.</span> - <span class="nb">super</span><span class="p">(</span><span class="n">SqlSensor</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="fm">__init__</span><span class="p">(</span><span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span> - - <span class="k">def</span> <span class="nf">poke</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">context</span><span class="p">):</span> - <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">first_poke</span><span class="p">:</span> - <span class="bp">self</span><span class="o">.</span><span class="n">first_poke</span> <span class="o">=</span> <span class="kc">False</span> - <span class="k">if</span> <span class="s1">'.'</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">table</span><span class="p">:</span> - <span class="bp">self</span><span class="o">.</span><span class="n">schema</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">table</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">table</span><span class="o">.</span><span class="n">split</span><span class="p">(</span><span class="s1">'.'</span><span class="p">)</span> - <span class="bp">self</span><span class="o">.</span><span class="n">sql</span> <span class="o">=</span> <span class="s2">"""</span> -<span class="s2"> SELECT 'X'</span> -<span class="s2"> FROM PARTITIONS A0</span> -<span class="s2"> LEFT OUTER JOIN TBLS B0 ON A0.TBL_ID = B0.TBL_ID</span> -<span class="s2"> LEFT OUTER JOIN DBS C0 ON B0.DB_ID = C0.DB_ID</span> -<span class="s2"> WHERE</span> -<span class="s2"> B0.TBL_NAME = '</span><span class="si">{self.table}</span><span class="s2">' AND</span> -<span class="s2"> C0.NAME = '</span><span class="si">{self.schema}</span><span class="s2">' AND</span> -<span class="s2"> A0.PART_NAME = '</span><span class="si">{self.partition_name}</span><span class="s2">';</span> -<span class="s2"> """</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="bp">self</span><span class="o">=</span><span class="bp">self</span><span class="p">)</span> - <span class="k">return</span> <span class="nb">super</span><span class="p">(</span><span class="n">MetastorePartitionSensor</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="n">poke</span><span class="p">(</span><span class="n">context</span><span class="p">)</span> - - -<span class="k">class</span> <span class="nc">ExternalTaskSensor</span><span class="p">(</span><span class="n">BaseSensorOperator</span><span class="p">):</span> - <span class="sd">"""</span> -<span class="sd"> Waits for a task to complete in a different DAG</span> - -<span class="sd"> :param external_dag_id: The dag_id that contains the task you want to</span> -<span class="sd"> wait for</span> -<span class="sd"> :type external_dag_id: string</span> -<span class="sd"> :param external_task_id: The task_id that contains the task you want to</span> -<span class="sd"> wait for</span> -<span class="sd"> :type external_task_id: string</span> -<span class="sd"> :param allowed_states: list of allowed states, default is ``['success']``</span> -<span class="sd"> :type allowed_states: list</span> -<span class="sd"> :param execution_delta: time difference with the previous execution to</span> -<span class="sd"> look at, the default is the same execution_date as the current task.</span> -<span class="sd"> For yesterday, use [positive!] datetime.timedelta(days=1). Either</span> -<span class="sd"> execution_delta or execution_date_fn can be passed to</span> -<span class="sd"> ExternalTaskSensor, but not both.</span> -<span class="sd"> :type execution_delta: datetime.timedelta</span> -<span class="sd"> :param execution_date_fn: function that receives the current execution date</span> -<span class="sd"> and returns the desired execution dates to query. Either execution_delta</span> -<span class="sd"> or execution_date_fn can be passed to ExternalTaskSensor, but not both.</span> -<span class="sd"> :type execution_date_fn: callable</span> -<span class="sd"> """</span> - <span class="n">ui_color</span> <span class="o">=</span> <span class="s1">'#19647e'</span> - - <span class="nd">@apply_defaults</span> - <span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span> - <span class="bp">self</span><span class="p">,</span> - <span class="n">external_dag_id</span><span class="p">,</span> - <span class="n">external_task_id</span><span class="p">,</span> - <span class="n">allowed_states</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> - <span class="n">execution_delta</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> - <span class="n">execution_date_fn</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> - <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span> - <span class="nb">super</span><span class="p">(</span><span class="n">ExternalTaskSensor</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="fm">__init__</span><span class="p">(</span><span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span> - <span class="bp">self</span><span class="o">.</span><span class="n">allowed_states</span> <span class="o">=</span> <span class="n">allowed_states</span> <span class="ow">or</span> <span class="p">[</span><span class="n">State</span><span class="o">.</span><span class="n">SUCCESS</span><span class="p">]</span> - <span class="k">if</span> <span class="n">execution_delta</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span> <span class="ow">and</span> <span class="n">execution_date_fn</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span> - <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span> - <span class="s1">'Only one of `execution_date` or `execution_date_fn` may'</span> - <span class="s1">'be provided to ExternalTaskSensor; not both.'</span><span class="p">)</span> - - <span class="bp">self</span><span class="o">.</span><span class="n">execution_delta</span> <span class="o">=</span> <span class="n">execution_delta</span> - <span class="bp">self</span><span class="o">.</span><span class="n">execution_date_fn</span> <span class="o">=</span> <span class="n">execution_date_fn</span> - <span class="bp">self</span><span class="o">.</span><span class="n">external_dag_id</span> <span class="o">=</span> <span class="n">external_dag_id</span> - <span class="bp">self</span><span class="o">.</span><span class="n">external_task_id</span> <span class="o">=</span> <span class="n">external_task_id</span> - - <span class="k">def</span> <span class="nf">poke</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">context</span><span class="p">):</span> - <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">execution_delta</span><span class="p">:</span> - <span class="n">dttm</span> <span class="o">=</span> <span class="n">context</span><span class="p">[</span><span class="s1">'execution_date'</span><span class="p">]</span> <span class="o">-</span> <span class="bp">self</span><span class="o">.</span><span class="n">execution_delta</span> - <span class="k">elif</span> <span class="bp">self</span><span class="o">.</span><span class="n">execution_date_fn</span><span class="p">:</span> - <span class="n">dttm</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">execution_date_fn</span><span class="p">(</span><span class="n">context</span><span class="p">[</span><span class="s1">'execution_date'</span><span class="p">])</span> - <span class="k">else</span><span class="p">:</span> - <span class="n">dttm</span> <span class="o">=</span> <span class="n">context</span><span class="p">[</span><span class="s1">'execution_date'</span><span class="p">]</span> - - <span class="n">dttm_filter</span> <span class="o">=</span> <span class="n">dttm</span> <span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">dttm</span><span class="p">,</span> <span class="nb">list</span><span class="p">)</span> <span class="k">else</span> <span class="p">[</span><span class="n">dttm</span><span class="p">]</span> - <span class="n">serialized_dttm_filter</span> <span class="o">=</span> <span class="s1">','</span><span class="o">.</span><span class="n">join</span><span class="p">(</span> - <span class="p">[</span><span class="n">datetime</span><span class="o">.</span><span class="n">isoformat</span><span class="p">()</span> <span class="k">for</span> <span class="n">datetime</span> <span class="ow">in</span> <span class="n">dttm_filter</span><span class="p">])</span> - - <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span> - <span class="s1">'Poking for '</span> - <span class="s1">'</span><span class="si">{self.external_dag_id}</span><span class="s1">.'</span> - <span class="s1">'</span><span class="si">{self.external_task_id}</span><span class="s1"> on '</span> - <span class="s1">'</span><span class="si">{}</span><span class="s1"> ... '</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">serialized_dttm_filter</span><span class="p">,</span> <span class="o">**</span><span class="nb">locals</span><span class="p">()))</span> - <span class="n">TI</span> <span class="o">=</span> <span class="n">TaskInstance</span> - - <span class="n">session</span> <span class="o">=</span> <span class="n">settings</span><span class="o">.</span><span class="n">Session</span><span class="p">()</span> - <span class="n">count</span> <span class="o">=</span> <span class="n">session</span><span class="o">.</span><span class="n">query</span><span class="p">(</span><span class="n">TI</span><span class="p">)</span><span class="o">.</span><span class="n">filter</span><span class="p">(</span> - <span class="n">TI</span><span class="o">.</span><span class="n">dag_id</span> <span class="o">==</span> <span class="bp">self</span><span class="o">.</span><span class="n">external_dag_id</span><span class="p">,</span> - <span class="n">TI</span><span class="o">.</span><span class="n">task_id</span> <span class="o">==</span> <span class="bp">self</span><span class="o">.</span><span class="n">external_task_id</span><span class="p">,</span> - <span class="n">TI</span><span class="o">.</span><span class="n">state</span><span class="o">.</span><span class="n">in_</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">allowed_states</span><span class="p">),</span> - <span class="n">TI</span><span class="o">.</span><span class="n">execution_date</span><span class="o">.</span><span class="n">in_</span><span class="p">(</span><span class="n">dttm_filter</span><span class="p">),</span> - <span class="p">)</span><span class="o">.</span><span class="n">count</span><span class="p">()</span> - <span class="n">session</span><span class="o">.</span><span class="n">commit</span><span class="p">()</span> - <span class="n">session</span><span class="o">.</span><span class="n">close</span><span class="p">()</span> - <span class="k">return</span> <span class="n">count</span> <span class="o">==</span> <span class="nb">len</span><span class="p">(</span><span class="n">dttm_filter</span><span class="p">)</span> - - -<span class="k">class</span> <span class="nc">NamedHivePartitionSensor</span><span class="p">(</span><span class="n">BaseSensorOperator</span><span class="p">):</span> - <span class="sd">"""</span> -<span class="sd"> Waits for a set of partitions to show up in Hive.</span> - -<span class="sd"> :param partition_names: List of fully qualified names of the</span> -<span class="sd"> partitions to wait for. A fully qualified name is of the</span> -<span class="sd"> form ``schema.table/pk1=pv1/pk2=pv2``, for example,</span> -<span class="sd"> default.users/ds=2016-01-01. This is passed as is to the metastore</span> -<span class="sd"> Thrift client ``get_partitions_by_name`` method. Note that</span> -<span class="sd"> you cannot use logical or comparison operators as in</span> -<span class="sd"> HivePartitionSensor.</span> -<span class="sd"> :type partition_names: list of strings</span> -<span class="sd"> :param metastore_conn_id: reference to the metastore thrift service</span> -<span class="sd"> connection id</span> -<span class="sd"> :type metastore_conn_id: str</span> -<span class="sd"> """</span> - - <span class="n">template_fields</span> <span class="o">=</span> <span class="p">(</span><span class="s1">'partition_names'</span><span class="p">,</span> <span class="p">)</span> - <span class="n">ui_color</span> <span class="o">=</span> <span class="s1">'#8d99ae'</span> - - <span class="nd">@apply_defaults</span> - <span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span> - <span class="bp">self</span><span class="p">,</span> - <span class="n">partition_names</span><span class="p">,</span> - <span class="n">metastore_conn_id</span><span class="o">=</span><span class="s1">'metastore_default'</span><span class="p">,</span> - <span class="n">poke_interval</span><span class="o">=</span><span class="mi">60</span> <span class="o">*</span> <span class="mi">3</span><span class="p">,</span> - <span class="o">*</span><span class="n">args</span><span class="p">,</span> - <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span> - <span class="nb">super</span><span class="p">(</span><span class="n">NamedHivePartitionSensor</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="fm">__init__</span><span class="p">(</span> - <span class="n">poke_interval</span><span class="o">=</span><span class="n">poke_interval</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span> - - <span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">partition_names</span><span class="p">,</span> <span class="n">basestring</span><span class="p">):</span> - <span class="k">raise</span> <span class="ne">TypeError</span><span class="p">(</span><span class="s1">'partition_names must be an array of strings'</span><span class="p">)</span> - - <span class="bp">self</span><span class="o">.</span><span class="n">metastore_conn_id</span> <span class="o">=</span> <span class="n">metastore_conn_id</span> - <span class="bp">self</span><span class="o">.</span><span class="n">partition_names</span> <span class="o">=</span> <span class="n">partition_names</span> - <span class="bp">self</span><span class="o">.</span><span class="n">next_poke_idx</span> <span class="o">=</span> <span class="mi">0</span> - - <span class="nd">@classmethod</span> - <span class="k">def</span> <span class="nf">parse_partition_name</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">partition</span><span class="p">):</span> - <span class="k">try</span><span class="p">:</span> - <span class="n">schema</span><span class="p">,</span> <span class="n">table_partition</span> <span class="o">=</span> <span class="n">partition</span><span class="o">.</span><span class="n">split</span><span class="p">(</span><span class="s1">'.'</span><span class="p">,</span> <span class="mi">1</span><span class="p">)</span> - <span class="n">table</span><span class="p">,</span> <span class="n">partition</span> <span class="o">=</span> <span class="n">table_partition</span><span class="o">.</span><span class="n">split</span><span class="p">(</span><span class="s1">'/'</span><span class="p">,</span> <span class="mi">1</span><span class="p">)</span> - <span class="k">return</span> <span class="n">schema</span><span class="p">,</span> <span class="n">table</span><span class="p">,</span> <span class="n">partition</span> - <span class="k">except</span> <span class="ne">ValueError</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span> - <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s1">'Could not parse '</span> <span class="o">+</span> <span class="n">partition</span><span class="p">)</span> - - <span class="k">def</span> <span class="nf">poke</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">context</span><span class="p">):</span> - <span class="k">if</span> <span class="ow">not</span> <span class="nb">hasattr</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="s1">'hook'</span><span class="p">):</span> - <span class="kn">from</span> <span class="nn">airflow.hooks.hive_hooks</span> <span class="k">import</span> <span class="n">HiveMetastoreHook</span> - <span class="bp">self</span><span class="o">.</span><span class="n">hook</span> <span class="o">=</span> <span class="n">HiveMetastoreHook</span><span class="p">(</span> - <span class="n">metastore_conn_id</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">metastore_conn_id</span><span class="p">)</span> - - <span class="k">def</span> <span class="nf">poke_partition</span><span class="p">(</span><span class="n">partition</span><span class="p">):</span> - - <span class="n">schema</span><span class="p">,</span> <span class="n">table</span><span class="p">,</span> <span class="n">partition</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">parse_partition_name</span><span class="p">(</span><span class="n">partition</span><span class="p">)</span> - - <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span> - <span class="s1">'Poking for </span><span class="si">{schema}</span><span class="s1">.</span><span class="si">{table}</span><span class="s1">/</span><span class="si">{partition}</span><span class="s1">'</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="o">**</span><span class="nb">locals</span><span class="p">())</span> - <span class="p">)</span> - <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">hook</span><span class="o">.</span><span class="n">check_for_named_partition</span><span class="p">(</span> - <span class="n">schema</span><span class="p">,</span> <span class="n">table</span><span class="p">,</span> <span class="n">partition</span><span class="p">)</span> - - <span class="k">while</span> <span class="bp">self</span><span class="o">.</span><span class="n">next_poke_idx</span> <span class="o"><</span> <span class="nb">len</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">partition_names</span><span class="p">):</span> - <span class="k">if</span> <span class="n">poke_partition</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">partition_names</span><span class="p">[</span><span class="bp">self</span><span class="o">.</span><span class="n">next_poke_idx</span><span class="p">]):</span> - <span class="bp">self</span><span class="o">.</span><span class="n">next_poke_idx</span> <span class="o">+=</span> <span class="mi">1</span> - <span class="k">else</span><span class="p">:</span> - <span class="k">return</span> <span class="kc">False</span> - - <span class="k">return</span> <span class="kc">True</span> - - -<span class="k">class</span> <span class="nc">HivePartitionSensor</span><span class="p">(</span><span class="n">BaseSensorOperator</span><span class="p">):</span> - <span class="sd">"""</span> -<span class="sd"> Waits for a partition to show up in Hive.</span> - -<span class="sd"> Note: Because ``partition`` supports general logical operators, it</span> -<span class="sd"> can be inefficient. Consider using NamedHivePartitionSensor instead if</span> -<span class="sd"> you don't need the full flexibility of HivePartitionSensor.</span> - -<span class="sd"> :param table: The name of the table to wait for, supports the dot</span> -<span class="sd"> notation (my_database.my_table)</span> -<span class="sd"> :type table: string</span> -<span class="sd"> :param partition: The partition clause to wait for. This is passed as</span> -<span class="sd"> is to the metastore Thrift client ``get_partitions_by_filter`` method,</span> -<span class="sd"> and apparently supports SQL like notation as in ``ds='2015-01-01'</span> -<span class="sd"> AND type='value'`` and comparison operators as in ``"ds>=2015-01-01"``</span> -<span class="sd"> :type partition: string</span> -<span class="sd"> :param metastore_conn_id: reference to the metastore thrift service</span> -<span class="sd"> connection id</span> -<span class="sd"> :type metastore_conn_id: str</span> -<span class="sd"> """</span> - <span class="n">template_fields</span> <span class="o">=</span> <span class="p">(</span><span class="s1">'schema'</span><span class="p">,</span> <span class="s1">'table'</span><span class="p">,</span> <span class="s1">'partition'</span><span class="p">,)</span> - <span class="n">ui_color</span> <span class="o">=</span> <span class="s1">'#C5CAE9'</span> - - <span class="nd">@apply_defaults</span> - <span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span> - <span class="bp">self</span><span class="p">,</span> - <span class="n">table</span><span class="p">,</span> <span class="n">partition</span><span class="o">=</span><span class="s2">"ds='{{ ds }}'"</span><span class="p">,</span> - <span class="n">metastore_conn_id</span><span class="o">=</span><span class="s1">'metastore_default'</span><span class="p">,</span> - <span class="n">schema</span><span class="o">=</span><span class="s1">'default'</span><span class="p">,</span> - <span class="n">poke_interval</span><span class="o">=</span><span class="mi">60</span><span class="o">*</span><span class="mi">3</span><span class="p">,</span> - <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span> - <span class="nb">super</span><span class="p">(</span><span class="n">HivePartitionSensor</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="fm">__init__</span><span class="p">(</span> - <span class="n">poke_interval</span><span class="o">=</span><span class="n">poke_interval</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span> - <span class="k">if</span> <span class="ow">not</span> <span class="n">partition</span><span class="p">:</span> - <span class="n">partition</span> <span class="o">=</span> <span class="s2">"ds='{{ ds }}'"</span> - <span class="bp">self</span><span class="o">.</span><span class="n">metastore_conn_id</span> <span class="o">=</span> <span class="n">metastore_conn_id</span> - <span class="bp">self</span><span class="o">.</span><span class="n">table</span> <span class="o">=</span> <span class="n">table</span> - <span class="bp">self</span><span class="o">.</span><span class="n">partition</span> <span class="o">=</span> <span class="n">partition</span> - <span class="bp">self</span><span class="o">.</span><span class="n">schema</span> <span class="o">=</span> <span class="n">schema</span> - - <span class="k">def</span> <span class="nf">poke</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">context</span><span class="p">):</span> - <span class="k">if</span> <span class="s1">'.'</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">table</span><span class="p">:</span> - <span class="bp">self</span><span class="o">.</span><span class="n">schema</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">table</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">table</span><span class="o">.</span><span class="n">split</span><span class="p">(</span><span class="s1">'.'</span><span class="p">)</span> - <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span> - <span class="s1">'Poking for table </span><span class="si">{self.schema}</span><span class="s1">.</span><span class="si">{self.table}</span><span class="s1">, '</span> - <span class="s1">'partition </span><span class="si">{self.partition}</span><span class="s1">'</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="o">**</span><span class="nb">locals</span><span class="p">()))</span> - <span class="k">if</span> <span class="ow">not</span> <span class="nb">hasattr</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="s1">'hook'</span><span class="p">):</span> - <span class="kn">from</span> <span class="nn">airflow.hooks.hive_hooks</span> <span class="k">import</span> <span class="n">HiveMetastoreHook</span> - <span class="bp">self</span><span class="o">.</span><span class="n">hook</span> <span class="o">=</span> <span class="n">HiveMetastoreHook</span><span class="p">(</span> - <span class="n">metastore_conn_id</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">metastore_conn_id</span><span class="p">)</span> - <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">hook</span><span class="o">.</span><span class="n">check_for_partition</span><span class="p">(</span> - <span class="bp">self</span><span class="o">.</span><span class="n">schema</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">table</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">partition</span><span class="p">)</span> - - -<span class="k">class</span> <span class="nc">HdfsSensor</span><span class="p">(</span><span class="n">BaseSensorOperator</span><span class="p">):</span> - <span class="sd">"""</span> -<span class="sd"> Waits for a file or folder to land in HDFS</span> -<span class="sd"> """</span> - <span class="n">template_fields</span> <span class="o">=</span> <span class="p">(</span><span class="s1">'filepath'</span><span class="p">,)</span> - <span class="n">ui_color</span> <span class="o">=</span> <span class="n">settings</span><span class="o">.</span><span class="n">WEB_COLORS</span><span class="p">[</span><span class="s1">'LIGHTBLUE'</span><span class="p">]</span> - - <span class="nd">@apply_defaults</span> - <span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span> - <span class="bp">self</span><span class="p">,</span> - <span class="n">filepath</span><span class="p">,</span> - <span class="n">hdfs_conn_id</span><span class="o">=</span><span class="s1">'hdfs_default'</span><span class="p">,</span> - <span class="n">ignored_ext</span><span class="o">=</span><span class="p">[</span><span class="s1">'_COPYING_'</span><span class="p">],</span> - <span class="n">ignore_copying</span><span class="o">=</span><span class="kc">True</span><span class="p">,</span> - <span class="n">file_size</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> - <span class="n">hook</span><span class="o">=</span><span class="n">HDFSHook</span><span class="p">,</span> - <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span> - <span class="nb">super</span><span class="p">(</span><span class="n">HdfsSensor</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="fm">__init__</span><span class="p">(</span><span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span> - <span class="bp">self</span><span class="o">.</span><span class="n">filepath</span> <span class="o">=</span> <span class="n">filepath</span> - <span class="bp">self</span><span class="o">.</span><span class="n">hdfs_conn_id</span> <span class="o">=</span> <span class="n">hdfs_conn_id</span> - <span class="bp">self</span><span class="o">.</span><span class="n">file_size</span> <span class="o">=</span> <span class="n">file_size</span> - <span class="bp">self</span><span class="o">.</span><span class="n">ignored_ext</span> <span class="o">=</span> <span class="n">ignored_ext</span> - <span class="bp">self</span><span class="o">.</span><span class="n">ignore_copying</span> <span class="o">=</span> <span class="n">ignore_copying</span> - <span class="bp">self</span><span class="o">.</span><span class="n">hook</span> <span class="o">=</span> <span class="n">hook</span> - - <span class="nd">@staticmethod</span> - <span class="k">def</span> <span class="nf">filter_for_filesize</span><span class="p">(</span><span class="n">result</span><span class="p">,</span> <span class="n">size</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span> - <span class="sd">"""</span> -<span class="sd"> Will test the filepath result and test if its size is at least self.filesize</span> - -<span class="sd"> :param result: a list of dicts returned by Snakebite ls</span> -<span class="sd"> :param size: the file size in MB a file should be at least to trigger True</span> -<span class="sd"> :return: (bool) depending on the matching criteria</span> -<span class="sd"> """</span> - <span class="k">if</span> <span class="n">size</span><span class="p">:</span> - <span class="n">log</span> <span class="o">=</span> <span class="n">LoggingMixin</span><span class="p">()</span><span class="o">.</span><span class="n">log</span> - <span class="n">log</span><span class="o">.</span><span class="n">debug</span><span class="p">(</span><span class="s1">'Filtering for file size >= </span><span class="si">%s</span><span class="s1"> in files: </span><span class="si">%s</span><span class="s1">'</span><span class="p">,</span> <span class="n">size</span><span class="p">,</span> <span class="nb">map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">x</span><span class="p">:</span> <span class="n">x</span><span class="p">[</span><span class="s1">'path'</span><span class="p">],</span> <span class="n">result</span><span class="p">))</span> - <span class="n">size</span> <span class="o">*=</span> <span class="n">settings</span><span class="o">.</span><span class="n">MEGABYTE</span> - <span class="n">result</span> <span class="o">=</span> <span class="p">[</span><span class="n">x</span> <span class="k">for</span> <span class="n">x</span> <span class="ow">in</span> <span class="n">result</span> <span class="k">if</span> <span class="n">x</span><span class="p">[</span><span class="s1">'length'</span><span class="p">]</span> <span class="o">>=</span> <span class="n">size</span><span class="p">]</span> - <span class="n">log</span><span class="o">.</span><span class="n">debug</span><span class="p">(</span><span class="s1">'HdfsSensor.poke: after size filter result is </span><span class="si">%s</span><span class="s1">'</span><span class="p">,</span> <span class="n">result</span><span class="p">)</span> - <span class="k">return</span> <span class="n">result</span> - - <span class="nd">@staticmethod</span> - <span class="k">def</span> <span class="nf">filter_for_ignored_ext</span><span class="p">(</span><span class="n">result</span><span class="p">,</span> <span class="n">ignored_ext</span><span class="p">,</span> <span class="n">ignore_copying</span><span class="p">):</span> - <span class="sd">"""</span> -<span class="sd"> Will filter if instructed to do so the result to remove matching criteria</span> - -<span class="sd"> :param result: (list) of dicts returned by Snakebite ls</span> -<span class="sd"> :param ignored_ext: (list) of ignored extensions</span> -<span class="sd"> :param ignore_copying: (bool) shall we ignore ?</span> -<span class="sd"> :return: (list) of dicts which were not removed</span> -<span class="sd"> """</span> - <span class="k">if</span> <span class="n">ignore_copying</span><span class="p">:</span> - <span class="n">log</span> <span class="o">=</span> <span class="n">LoggingMixin</span><span class="p">()</span><span class="o">.</span><span class="n">log</span> - <span class="n">regex_builder</span> <span class="o">=</span> <span class="s2">"^.*\.(</span><span class="si">%s</span><span class="s2">$)$"</span> <span class="o">%</span> <span class="s1">'$|'</span><span class="o">.</span><span class="n">join</span><span class="p">(</span><span class="n">ignored_ext</span><span class="p">)</span> - <span class="n">ignored_extentions_regex</span> <span class="o">=</span> <span class="n">re</span><span class="o">.</span><span class="n">compile</span><span class="p">(</span><span class="n">regex_builder</span><span class="p">)</span> - <span class="n">log</span><span class="o">.</span><span class="n">debug</span><span class="p">(</span> - <span class="s1">'Filtering result for ignored extensions: </span><span class="si">%s</span><span class="s1"> in files </span><span class="si">%s</span><span class="s1">'</span><span class="p">,</span> - <span class="n">ignored_extentions_regex</span><span class="o">.</span><span class="n">pattern</span><span class="p">,</span> <span class="nb">map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">x</span><span class="p">:</span> <span class="n">x</span><span class="p">[</span><span class="s1">'path'</span><span class="p">],</span> <span class="n">result</span><span class="p">)</span> - <span class="p">)</span> - <span class="n">result</span> <span class="o">=</span> <span class="p">[</span><span class="n">x</span> <span class="k">for</span> <span class="n">x</span> <span class="ow">in</span> <span class="n">result</span> <span class="k">if</span> <span class="ow">not</span> <span class="n">ignored_extentions_regex</span><span class="o">.</span><span class="n">match</span><span class="p">(</span><span class="n">x</span><span class="p">[</span><span class="s1">'path'</span><span class="p">])]</span> - <span class="n">log</span><span class="o">.</span><span class="n">debug</span><span class="p">(</span><span class="s1">'HdfsSensor.poke: after ext filter result is </span><span class="si">%s</span><span class="s1">'</span><span class="p">,</span> <span class="n">result</span><span class="p">)</span> - <span class="k">return</span> <span class="n">result</span> - - <span class="k">def</span> <span class="nf">poke</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">context</span><span class="p">):</span> - <span class="n">sb</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">hook</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">hdfs_conn_id</span><span class="p">)</span><span class="o">.</span><span class="n">get_conn</span><span class="p">()</span> - <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s1">'Poking for file </span><span class="si">{self.filepath}</span><span class="s1">'</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="o">**</span><span class="nb">locals</span><span class="p">()))</span> - <span class="k">try</span><span class="p">:</span> - <span class="c1"># IMOO it's not right here, as there no raise of any kind.</span> - <span class="c1"># if the filepath is let's say '/data/mydirectory', it's correct but if it is '/data/mydirectory/*',</span> - <span class="c1"># it's not correct as the directory exists and sb does not raise any error</span> - <span class="c1"># here is a quick fix</span> - <span class="n">result</span> <span class="o">=</span> <span class="p">[</span><span class="n">f</span> <span class="k">for</span> <span class="n">f</span> <span class="ow">in</span> <span class="n">sb</span><span class="o">.</span><span class="n">ls</span><span class="p">([</span><span class="bp">self</span><span class="o">.</span><span class="n">filepath</span><span class="p">],</span> <span class="n">include_toplevel</span><span class="o">=</span><span class="kc">False</span><span class="p">)]</span> - <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">debug</span><span class="p">(</span><span class="s1">'HdfsSensor.poke: result is </span><span class="si">%s</span><span class="s1">'</span><span class="p">,</span> <span class="n">result</span><span class="p">)</span> - <span class="n">result</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">filter_for_ignored_ext</span><span class="p">(</span><span class="n">result</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">ignored_ext</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">ignore_copying</span><span class="p">)</span> - <span class="n">result</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">filter_for_filesize</span><span class="p">(</span><span class="n">result</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">file_size</span><span class="p">)</span> - <span class="k">return</span> <span class="nb">bool</span><span class="p">(</span><span class="n">result</span><span class="p">)</span> - <span class="k">except</span><span class="p">:</span> - <span class="n">e</span> <span class="o">=</span> <span class="n">sys</span><span class="o">.</span><span class="n">exc_info</span><span class="p">()</span> - <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">debug</span><span class="p">(</span><span class="s2">"Caught an exception !: </span><span class="si">%s</span><span class="s2">"</span><span class="p">,</span> <span class="nb">str</span><span class="p">(</span><span class="n">e</span><span class="p">))</span> - <span class="k">return</span> <span class="kc">False</span> - - -<span class="k">class</span> <span class="nc">WebHdfsSensor</span><span class="p">(</span><span class="n">BaseSensorOperator</span><span class="p">):</span> - <span class="sd">"""</span> -<span class="sd"> Waits for a file or folder to land in HDFS</span> -<span class="sd"> """</span> - <span class="n">template_fields</span> <span class="o">=</span> <span class="p">(</span><span class="s1">'filepath'</span><span class="p">,)</span> - - <span class="nd">@apply_defaults</span> - <span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span> - <span class="bp">self</span><span class="p">,</span> - <span class="n">filepath</span><span class="p">,</span> - <span class="n">webhdfs_conn_id</span><span class="o">=</span><span class="s1">'webhdfs_default'</span><span class="p">,</span> - <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span> - <span class="nb">super</span><span class="p">(</span><span class="n">WebHdfsSensor</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="fm">__init__</span><span class="p">(</span><span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span> - <span class="bp">self</span><span class="o">.</span><span class="n">filepath</span> <span class="o">=</span> <span class="n">filepath</span> - <span class="bp">self</span><span class="o">.</span><span class="n">webhdfs_conn_id</span> <span class="o">=</span> <span class="n">webhdfs_conn_id</span> - - <span class="k">def</span> <span class="nf">poke</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">context</span><span class="p">):</span> - <span class="kn">from</span> <span class="nn">airflow.hooks.webhdfs_hook</span> <span class="k">import</span> <span class="n">WebHDFSHook</span> - <span class="n">c</span> <span class="o">=</span> <span class="n">WebHDFSHook</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">webhdfs_conn_id</span><span class="p">)</span> - <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s1">'Poking for file </span><span class="si">{self.filepath}</span><span class="s1">'</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="o">**</span><span class="nb">locals</span><span class="p">()))</span> - <span class="k">return</span> <span class="n">c</span><span class="o">.</span><span class="n">check_for_path</span><span class="p">(</span><span class="n">hdfs_path</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">filepath</span><span class="p">)</span> - - -<span class="k">class</span> <span class="nc">S3KeySensor</span><span class="p">(</span><span class="n">BaseSensorOperator</span><span class="p">):</span> - <span class="sd">"""</span> -<span class="sd"> Waits for a key (a file-like instance on S3) to be present in a S3 bucket.</span> -<span class="sd"> S3 being a key/value it does not support folders. The path is just a key</span> -<span class="sd"> a resource.</span> - -<span class="sd"> :param bucket_key: The key being waited on. Supports full s3:// style url</span> -<span class="sd"> or relative path from root level.</span> -<span class="sd"> :type bucket_key: str</span> -<span class="sd"> :param bucket_name: Name of the S3 bucket</span> -<span class="sd"> :type bucket_name: str</span> -<span class="sd"> :param wildcard_match: whether the bucket_key should be interpreted as a</span> -<span class="sd"> Unix wildcard pattern</span> -<span class="sd"> :type wildcard_match: bool</span> -<span class="sd"> :param aws_conn_id: a reference to the s3 connection</span> -<span class="sd"> :type aws_conn_id: str</span> -<span class="sd"> """</span> - <span class="n">template_fields</span> <span class="o">=</span> <span class="p">(</span><span class="s1">'bucket_key'</span><span class="p">,</span> <span class="s1">'bucket_name'</span><span class="p">)</span> - - <span class="nd">@apply_defaults</span> - <span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span> - <span class="bp">self</span><span class="p">,</span> <span class="n">bucket_key</span><span class="p">,</span> - <span class="n">bucket_name</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> - <span class="n">wildcard_match</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> - <span class="n">aws_conn_id</span><span class="o">=</span><span class="s1">'aws_default'</span><span class="p">,</span> - <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span> - <span class="nb">super</span><span class="p">(</span><span class="n">S3KeySensor</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="fm">__init__</span><span class="p">(</span><span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span> - <span class="c1"># Parse</span> - <span class="k">if</span> <span class="n">bucket_name</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span> - <span class="n">parsed_url</span> <span class="o">=</span> <span class="n">urlparse</span><span class="p">(</span><span class="n">bucket_key</span><span class="p">)</span> - <span class="k">if</span> <span class="n">parsed_url</span><span class="o">.</span><span class="n">netloc</span> <span class="o">==</span> <span class="s1">''</span><span class="p">:</span> - <span class="k">raise</span> <span class="n">AirflowException</span><span class="p">(</span><span class="s1">'Please provide a bucket_name'</span><span class="p">)</span> - <span class="k">else</span><span class="p">:</span> - <span class="n">bucket_name</span> <span class="o">=</span> <span class="n">parsed_url</span><span class="o">.</span><span class="n">netloc</span> - <span class="k">if</span> <span class="n">parsed_url</span><span class="o">.</span><span class="n">path</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span> <span class="o">==</span> <span class="s1">'/'</span><span class="p">:</span> - <span class="n">bucket_key</span> <span class="o">=</span> <span class="n">parsed_url</span><span class="o">.</span><span class="n">path</span><span class="p">[</span><span class="mi">1</span><span class="p">:]</span> - <span class="k">else</span><span class="p">:</span> - <span class="n">bucket_key</span> <span class="o">=</span> <span class="n">parsed_url</span><span class="o">.</span><span class="n">path</span> - <span class="bp">self</span><span class="o">.</span><span class="n">bucket_name</span> <span class="o">=</span> <span class="n">bucket_name</span> - <span class="bp">self</span><span class="o">.</span><span class="n">bucket_key</span> <span class="o">=</span> <span class="n">bucket_key</span> - <span class="bp">self</span><span class="o">.</span><span class="n">wildcard_match</span> <span class="o">=</span> <span class="n">wildcard_match</span> - <span class="bp">self</span><span class="o">.</span><span class="n">aws_conn_id</span> <span class="o">=</span> <span class="n">aws_conn_id</span> - - <span class="k">def</span> <span class="nf">poke</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">context</span><span class="p">):</span> - <span class="kn">from</span> <span class="nn">airflow.hooks.S3_hook</span> <span class="k">import</span> <span class="n">S3Hook</span> - <span class="n">hook</span> <span class="o">=</span> <span class="n">S3Hook</span><span class="p">(</span><span class="n">aws_conn_id</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">aws_conn_id</span><span class="p">)</span> - <span class="n">full_url</span> <span class="o">=</span> <span class="s2">"s3://"</span> <span class="o">+</span> <span class="bp">self</span><span class="o">.</span><span class="n">bucket_name</span> <span class="o">+</span> <span class="s2">"/"</span> <span class="o">+</span> <span class="bp">self</span><span class="o">.</span><span class="n">bucket_key</span> - <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s1">'Poking for key : </span><span class="si">{full_url}</span><span class="s1">'</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="o">**</span><span class="nb">locals</span><span class="p">()))</span> - <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">wildcard_match</span><span class="p">:</span> - <span class="k">return</span> <span class="n">hook</span><span class="o">.</span><span class="n">check_for_wildcard_key</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">bucket_key</span><span class="p">,</span> - <span class="bp">self</span><span class="o">.</span><span class="n">bucket_name</span><span class="p">)</span> - <span class="k">else</span><span class="p">:</span> - <span class="k">return</span> <span class="n">hook</span><span class="o">.</span><span class="n">check_for_key</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">bucket_key</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">bucket_name</span><span class="p">)</span> - - -<span class="k">class</span> <span class="nc">S3PrefixSensor</span><span class="p">(</span><span class="n">BaseSensorOperator</span><span class="p">):</span> - <span class="sd">"""</span> -<span class="sd"> Waits for a prefix to exist. A prefix is the first part of a key,</span> -<span class="sd"> thus enabling checking of constructs similar to glob airfl* or</span> -<span class="sd"> SQL LIKE 'airfl%'. There is the possibility to precise a delimiter to</span> -<span class="sd"> indicate the hierarchy or keys, meaning that the match will stop at that</span> -<span class="sd"> delimiter. Current code accepts sane delimiters, i.e. characters that</span> -<span class="sd"> are NOT special characters in the Python regex engine.</span> - -<span class="sd"> :param bucket_name: Name of the S3 bucket</span> -<span class="sd"> :type bucket_name: str</span> -<span class="sd"> :param prefix: The prefix being waited on. Relative path from bucket root level.</span> -<span class="sd"> :type prefix: str</span> -<span class="sd"> :param delimiter: The delimiter intended to show hierarchy.</span> -<span class="sd"> Defaults to '/'.</span> -<span class="sd"> :type delimiter: str</span> -<span class="sd"> """</span> - <span class="n">template_fields</span> <span class="o">=</span> <span class="p">(</span><span class="s1">'prefix'</span><span class="p">,</span> <span class="s1">'bucket_name'</span><span class="p">)</span> - - <span class="nd">@apply_defaults</span> - <span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span> - <span class="bp">self</span><span class="p">,</span> <span class="n">bucket_name</span><span class="p">,</span> - <span class="n">prefix</span><span class="p">,</span> <span class="n">delimiter</span><span class="o">=</span><span class="s1">'/'</span><span class="p">,</span> - <span class="n">aws_conn_id</span><span class="o">=</span><span class="s1">'aws_default'</span><span class="p">,</span> - <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span> - <span class="nb">super</span><span class="p">(</span><span class="n">S3PrefixSensor</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="fm">__init__</span><span class="p">(</span><span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span> - <span class="c1"># Parse</span> - <span class="bp">self</span><span class="o">.</span><span class="n">bucket_name</span> <span class="o">=</span> <span class="n">bucket_name</span> - <span class="bp">self</span><span class="o">.</span><span class="n">prefix</span> <span class="o">=</span> <span class="n">prefix</span> - <span class="bp">self</span><span class="o">.</span><span class="n">delimiter</span> <span class="o">=</span> <span class="n">delimiter</span> - <span class="bp">self</span><span class="o">.</span><span class="n">full_url</span> <span class="o">=</span> <span class="s2">"s3://"</span> <span class="o">+</span> <span class="n">bucket_name</span> <span class="o">+</span> <span class="s1">'/'</span> <span class="o">+</span> <span class="n">prefix</span> - <span class="bp">self</span><span class="o">.</span><span class="n">aws_conn_id</span> <span class="o">=</span> <span class="n">aws_conn_id</span> - - <span class="k">def</span> <span class="nf">poke</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">context</span><span class="p">):</span> - <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s1">'Poking for prefix : </span><span class="si">{self.prefix}</span><span class="se">\n</span><span class="s1">'</span> - <span class="s1">'in bucket s3://</span><span class="si">{self.bucket_name}</span><span class="s1">'</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="o">**</span><span class="nb">locals</span><span class="p">()))</span> - <span class="kn">from</span> <span class="nn">airflow.hooks.S3_hook</span> <span class="k">import</span> <span class="n">S3Hook</span> - <span class="n">hook</span> <span class="o">=</span> <span class="n">S3Hook</span><span class="p">(</span><span class="n">aws_conn_id</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">aws_conn_id</span><span class="p">)</span> - <span class="k">return</span> <span class="n">hook</span><span class="o">.</span><span class="n">check_for_prefix</span><span class="p">(</span> - <span class="n">prefix</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">prefix</span><span class="p">,</span> - <span class="n">delimiter</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">delimiter</span><span class="p">,</span> - <span class="n">bucket_name</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">bucket_name</span><span class="p">)</span> - - -<span class="k">class</span> <span class="nc">TimeSensor</span><span class="p">(</span><span class="n">BaseSensorOperator</span><span class="p">):</span> - <span class="sd">"""</span> -<span class="sd"> Waits until the specified time of the day.</span> - -<span class="sd"> :param target_time: time after which the job succeeds</span> -<span class="sd"> :type target_time: datetime.time</span> -<span class="sd"> """</span> - <span class="n">template_fields</span> <span class="o">=</span> <span class="nb">tuple</span><span class="p">()</span> - - <span class="nd">@apply_defaults</span> - <span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">target_time</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span> - <span class="nb">super</span><span class="p">(</span><span class="n">TimeSensor</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="fm">__init__</span><span class="p">(</span><span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span> - <span class="bp">self</span><span class="o">.</span><span class="n">target_time</span> <span class="o">=</span> <span class="n">target_time</span> - - <span class="k">def</span> <span class="nf">poke</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">context</span><span class="p">):</span> - <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s1">'Checking if the time (</span><span class="si">%s</span><span class="s1">) has come'</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">target_time</span><span class="p">)</span> - <span class="k">return</span> <span class="n">datetime</span><span class="o">.</span><span class="n">utcnow</span><span class="p">()</span><span class="o">.</span><span class="n">time</span><span class="p">()</span> <span class="o">></span> <span class="bp">self</span><span class="o">.</span><span class="n">target_time</span> - - -<span class="k">class</span> <span class="nc">TimeDeltaSensor</span><span class="p">(</span><span class="n">BaseSensorOperator</span><span class="p">):</span> - <span class="sd">"""</span> -<span class="sd"> Waits for a timedelta after the task's execution_date + schedule_interval.</span> -<span class="sd"> In Airflow, the daily task stamped with ``execution_date``</span> -<span class="sd"> 2016-01-01 can only start running on 2016-01-02. The timedelta here</span> -<span class="sd"> represents the time after the execution period has closed.</span> - -<span class="sd"> :param delta: time length to wait after execution_date before succeeding</span> -<span class="sd"> :type delta: datetime.timedelta</span> -<span class="sd"> """</span> - <span class="n">template_fields</span> <span class="o">=</span> <span class="nb">tuple</span><span class="p">()</span> - - <span class="nd">@apply_defaults</span> - <span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">delta</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span> - <span class="nb">super</span><span class="p">(</span><span class="n">TimeDeltaSensor</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="fm">__init__</span><span class="p">(</span><span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span> - <span class="bp">self</span><span class="o">.</span><span class="n">delta</span> <span class="o">=</span> <span class="n">delta</span> - - <span class="k">def</span> <span class="nf">poke</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">context</span><span class="p">):</span> - <span class="n">dag</span> <span class="o">=</span> <span class="n">context</span><span class="p">[</span><span class="s1">'dag'</span><span class="p">]</span> - <span class="n">target_dttm</span> <span class="o">=</span> <span class="n">dag</span><span class="o">.</span><span class="n">following_schedule</span><span class="p">(</span><span class="n">context</span><span class="p">[</span><span class="s1">'execution_date'</span><span class="p">])</span> - <span class="n">target_dttm</span> <span class="o">+=</span> <span class="bp">self</span><span class="o">.</span><span class="n">delta</span> - <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s1">'Checking if the time (</span><span class="si">%s</span><span class="s1">) has come'</span><span class="p">,</span> <span class="n">target_dttm</span><span class="p">)</span> - <span class="k">return</span> <span class="n">datetime</span><span class="o">.</span><span class="n">utcnow</span><span class="p">()</span> <span class="o">></span> <span class="n">target_dttm</span> - - -<span class="k">class</span> <span class="nc">HttpSensor</span><span class="p">(</span><span class="n">BaseSensorOperator</span><span class="p">):</span> - <span class="sd">"""</span> -<span class="sd"> Executes a HTTP get statement and returns False on failure:</span> -<span class="sd"> 404 not found or response_check function returned False</span> - -<span class="sd"> :param http_conn_id: The connection to run the sensor against</span> -<span class="sd"> :type http_conn_id: string</span> -<span class="sd"> :param method: The HTTP request method to use</span> -<span class="sd"> :type method: string</span> -<span class="sd"> :param endpoint: The relative part of the full url</span> -<span class="sd"> :type endpoint: string</span> -<span class="sd"> :param request_params: The parameters to be added to the GET url</span> -<span class="sd"> :type request_params: a dictionary of string key/value pairs</span> -<span class="sd"> :param headers: The HTTP headers to be added to the GET request</span> -<span class="sd"> :type headers: a dictionary of string key/value pairs</span> -<span class="sd"> :param response_check: A check against the 'requests' response object.</span> -<span class="sd"> Returns True for 'pass' and False otherwise.</span> -<span class="sd"> :type response_check: A lambda or defined function.</span> -<span class="sd"> :param extra_options: Extra options for the 'requests' library, see the</span> -<span class="sd"> 'requests' documentation (options to modify timeout, ssl, etc.)</span> -<span class="sd"> :type extra_options: A dictionary of options, where key is string and value</span> -<span class="sd"> depends on the option that's being modified.</span> -<span class="sd"> ""&quo
<TRUNCATED>