http://git-wip-us.apache.org/repos/asf/incubator-airflow-site/blob/9c75ee9e/_modules/sensors.html ---------------------------------------------------------------------- diff --git a/_modules/sensors.html b/_modules/sensors.html index 6567e38..f2b1cfe 100644 --- a/_modules/sensors.html +++ b/_modules/sensors.html @@ -30,6 +30,9 @@ + <link rel="index" title="Index" + href="../genindex.html"/> + <link rel="search" title="Search" href="../search.html"/> <link rel="top" title="Airflow Documentation" href="../index.html"/> <link rel="up" title="Module code" href="index.html"/> @@ -40,6 +43,7 @@ <body class="wy-body-for-nav" role="document"> + <div class="wy-grid-for-nav"> @@ -90,6 +94,8 @@ <li class="toctree-l1"><a class="reference internal" href="../scheduler.html">Scheduling & Triggers</a></li> <li class="toctree-l1"><a class="reference internal" href="../plugins.html">Plugins</a></li> <li class="toctree-l1"><a class="reference internal" href="../security.html">Security</a></li> +<li class="toctree-l1"><a class="reference internal" href="../api.html">Experimental Rest API</a></li> +<li class="toctree-l1"><a class="reference internal" href="../integration.html">Integration</a></li> <li class="toctree-l1"><a class="reference internal" href="../faq.html">FAQ</a></li> <li class="toctree-l1"><a class="reference internal" href="../code.html">API Reference</a></li> </ul> @@ -104,8 +110,10 @@ <nav class="wy-nav-top" role="navigation" aria-label="top navigation"> - <i data-toggle="wy-nav-top" class="fa fa-bars"></i> - <a href="../index.html">Airflow</a> + + <i data-toggle="wy-nav-top" class="fa fa-bars"></i> + <a href="../index.html">Airflow</a> + </nav> @@ -118,19 +126,36 @@ + + + + + + + + + + <div role="navigation" aria-label="breadcrumbs navigation"> + <ul class="wy-breadcrumbs"> - <li><a href="../index.html">Docs</a> »</li> - + + <li><a href="../index.html">Docs</a> »</li> + <li><a href="index.html">Module code</a> »</li> - - <li>sensors</li> + + <li>sensors</li> + + <li class="wy-breadcrumbs-aside"> - + </li> + </ul> + + <hr/> </div> <div role="main" class="document" itemscope="itemscope" itemtype="http://schema.org/Article"> @@ -151,24 +176,27 @@ <span class="c1"># See the License for the specific language governing permissions and</span> <span class="c1"># limitations under the License.</span> -<span class="kn">from</span> <span class="nn">__future__</span> <span class="kn">import</span> <span class="n">print_function</span> -<span class="kn">from</span> <span class="nn">future</span> <span class="kn">import</span> <span class="n">standard_library</span> +<span class="kn">from</span> <span class="nn">__future__</span> <span class="k">import</span> <span class="n">print_function</span> +<span class="kn">from</span> <span class="nn">future</span> <span class="k">import</span> <span class="n">standard_library</span> <span class="n">standard_library</span><span class="o">.</span><span class="n">install_aliases</span><span class="p">()</span> -<span class="kn">from</span> <span class="nn">builtins</span> <span class="kn">import</span> <span class="nb">str</span> -<span class="kn">from</span> <span class="nn">past.builtins</span> <span class="kn">import</span> <span class="nb">basestring</span> +<span class="kn">from</span> <span class="nn">builtins</span> <span class="k">import</span> <span class="nb">str</span> +<span class="kn">from</span> <span class="nn">past.builtins</span> <span class="k">import</span> <span class="n">basestring</span> -<span class="kn">from</span> <span class="nn">datetime</span> <span class="kn">import</span> <span class="n">datetime</span> +<span class="kn">from</span> <span class="nn">datetime</span> <span class="k">import</span> <span class="n">datetime</span> <span class="kn">import</span> <span class="nn">logging</span> -<span class="kn">from</span> <span class="nn">urllib.parse</span> <span class="kn">import</span> <span class="n">urlparse</span> -<span class="kn">from</span> <span class="nn">time</span> <span class="kn">import</span> <span class="n">sleep</span> +<span class="kn">from</span> <span class="nn">urllib.parse</span> <span class="k">import</span> <span class="n">urlparse</span> +<span class="kn">from</span> <span class="nn">time</span> <span class="k">import</span> <span class="n">sleep</span> +<span class="kn">import</span> <span class="nn">re</span> +<span class="kn">import</span> <span class="nn">sys</span> <span class="kn">import</span> <span class="nn">airflow</span> -<span class="kn">from</span> <span class="nn">airflow</span> <span class="kn">import</span> <span class="n">hooks</span><span class="p">,</span> <span class="n">settings</span> -<span class="kn">from</span> <span class="nn">airflow.exceptions</span> <span class="kn">import</span> <span class="n">AirflowException</span><span class="p">,</span> <span class="n">AirflowSensorTimeout</span><span class="p">,</span> <span class="n">AirflowSkipException</span> -<span class="kn">from</span> <span class="nn">airflow.models</span> <span class="kn">import</span> <span class="n">BaseOperator</span><span class="p">,</span> <span class="n">TaskInstance</span><span class="p">,</span> <span class="n">Connection</span> <span class="k">as</span> <span class="n">DB</span> -<span class="kn">from</span> <span class="nn">airflow.hooks.base_hook</span> <span class="kn">import</span> <span class="n">BaseHook</span> -<span class="kn">from</span> <span class="nn">airflow.utils.state</span> <span class="kn">import</span> <span class="n">State</span> -<span class="kn">from</span> <span class="nn">airflow.utils.decorators</span> <span class="kn">import</span> <span class="n">apply_defaults</span> +<span class="kn">from</span> <span class="nn">airflow</span> <span class="k">import</span> <span class="n">hooks</span><span class="p">,</span> <span class="n">settings</span> +<span class="kn">from</span> <span class="nn">airflow.exceptions</span> <span class="k">import</span> <span class="n">AirflowException</span><span class="p">,</span> <span class="n">AirflowSensorTimeout</span><span class="p">,</span> <span class="n">AirflowSkipException</span> +<span class="kn">from</span> <span class="nn">airflow.models</span> <span class="k">import</span> <span class="n">BaseOperator</span><span class="p">,</span> <span class="n">TaskInstance</span> +<span class="kn">from</span> <span class="nn">airflow.hooks.base_hook</span> <span class="k">import</span> <span class="n">BaseHook</span> +<span class="kn">from</span> <span class="nn">airflow.hooks.hdfs_hook</span> <span class="k">import</span> <span class="n">HDFSHook</span> +<span class="kn">from</span> <span class="nn">airflow.utils.state</span> <span class="k">import</span> <span class="n">State</span> +<span class="kn">from</span> <span class="nn">airflow.utils.decorators</span> <span class="k">import</span> <span class="n">apply_defaults</span> <span class="k">class</span> <span class="nc">BaseSensorOperator</span><span class="p">(</span><span class="n">BaseOperator</span><span class="p">):</span> @@ -193,7 +221,7 @@ <span class="bp">self</span><span class="p">,</span> <span class="n">poke_interval</span><span class="o">=</span><span class="mi">60</span><span class="p">,</span> <span class="n">timeout</span><span class="o">=</span><span class="mi">60</span><span class="o">*</span><span class="mi">60</span><span class="o">*</span><span class="mi">24</span><span class="o">*</span><span class="mi">7</span><span class="p">,</span> - <span class="n">soft_fail</span><span class="o">=</span><span class="bp">False</span><span class="p">,</span> + <span class="n">soft_fail</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span> <span class="nb">super</span><span class="p">(</span><span class="n">BaseSensorOperator</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="n">__init__</span><span class="p">(</span><span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span> <span class="bp">self</span><span class="o">.</span><span class="n">poke_interval</span> <span class="o">=</span> <span class="n">poke_interval</span> @@ -245,13 +273,13 @@ <span class="n">logging</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s1">'Poking: '</span> <span class="o">+</span> <span class="bp">self</span><span class="o">.</span><span class="n">sql</span><span class="p">)</span> <span class="n">records</span> <span class="o">=</span> <span class="n">hook</span><span class="o">.</span><span class="n">get_records</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">sql</span><span class="p">)</span> <span class="k">if</span> <span class="ow">not</span> <span class="n">records</span><span class="p">:</span> - <span class="k">return</span> <span class="bp">False</span> + <span class="k">return</span> <span class="kc">False</span> <span class="k">else</span><span class="p">:</span> <span class="k">if</span> <span class="nb">str</span><span class="p">(</span><span class="n">records</span><span class="p">[</span><span class="mi">0</span><span class="p">][</span><span class="mi">0</span><span class="p">])</span> <span class="ow">in</span> <span class="p">(</span><span class="s1">'0'</span><span class="p">,</span> <span class="s1">''</span><span class="p">,):</span> - <span class="k">return</span> <span class="bp">False</span> + <span class="k">return</span> <span class="kc">False</span> <span class="k">else</span><span class="p">:</span> - <span class="k">return</span> <span class="bp">True</span> - <span class="k">print</span><span class="p">(</span><span class="n">records</span><span class="p">[</span><span class="mi">0</span><span class="p">][</span><span class="mi">0</span><span class="p">])</span></div> + <span class="k">return</span> <span class="kc">True</span> + <span class="nb">print</span><span class="p">(</span><span class="n">records</span><span class="p">[</span><span class="mi">0</span><span class="p">][</span><span class="mi">0</span><span class="p">])</span></div> <div class="viewcode-block" id="MetastorePartitionSensor"><a class="viewcode-back" href="../code.html#airflow.operators.MetastorePartitionSensor">[docs]</a><span class="k">class</span> <span class="nc">MetastorePartitionSensor</span><span class="p">(</span><span class="n">SqlSensor</span><span class="p">):</span> @@ -286,13 +314,18 @@ <span class="bp">self</span><span class="o">.</span><span class="n">partition_name</span> <span class="o">=</span> <span class="n">partition_name</span> <span class="bp">self</span><span class="o">.</span><span class="n">table</span> <span class="o">=</span> <span class="n">table</span> <span class="bp">self</span><span class="o">.</span><span class="n">schema</span> <span class="o">=</span> <span class="n">schema</span> - <span class="bp">self</span><span class="o">.</span><span class="n">first_poke</span> <span class="o">=</span> <span class="bp">True</span> + <span class="bp">self</span><span class="o">.</span><span class="n">first_poke</span> <span class="o">=</span> <span class="kc">True</span> <span class="bp">self</span><span class="o">.</span><span class="n">conn_id</span> <span class="o">=</span> <span class="n">mysql_conn_id</span> + <span class="c1"># TODO(aoen): We shouldn't be using SqlSensor here but MetastorePartitionSensor.</span> + <span class="c1"># The problem is the way apply_defaults works isn't compatible with inheritance.</span> + <span class="c1"># The inheritance model needs to be reworked in order to support overriding args/</span> + <span class="c1"># kwargs with arguments here, then 'conn_id' and 'sql' can be passed into the</span> + <span class="c1"># constructor below and apply_defaults will no longer throw an exception.</span> <span class="nb">super</span><span class="p">(</span><span class="n">SqlSensor</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="n">__init__</span><span class="p">(</span><span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span> <span class="k">def</span> <span class="nf">poke</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">context</span><span class="p">):</span> <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">first_poke</span><span class="p">:</span> - <span class="bp">self</span><span class="o">.</span><span class="n">first_poke</span> <span class="o">=</span> <span class="bp">False</span> + <span class="bp">self</span><span class="o">.</span><span class="n">first_poke</span> <span class="o">=</span> <span class="kc">False</span> <span class="k">if</span> <span class="s1">'.'</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">table</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">schema</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">table</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">table</span><span class="o">.</span><span class="n">split</span><span class="p">(</span><span class="s1">'.'</span><span class="p">)</span> <span class="bp">self</span><span class="o">.</span><span class="n">sql</span> <span class="o">=</span> <span class="s2">"""</span> @@ -301,9 +334,9 @@ <span class="s2"> LEFT OUTER JOIN TBLS B0 ON A0.TBL_ID = B0.TBL_ID</span> <span class="s2"> LEFT OUTER JOIN DBS C0 ON B0.DB_ID = C0.DB_ID</span> <span class="s2"> WHERE</span> -<span class="s2"> B0.TBL_NAME = '{self.table}' AND</span> -<span class="s2"> C0.NAME = '{self.schema}' AND</span> -<span class="s2"> A0.PART_NAME = '{self.partition_name}';</span> +<span class="s2"> B0.TBL_NAME = '</span><span class="si">{self.table}</span><span class="s2">' AND</span> +<span class="s2"> C0.NAME = '</span><span class="si">{self.schema}</span><span class="s2">' AND</span> +<span class="s2"> A0.PART_NAME = '</span><span class="si">{self.partition_name}</span><span class="s2">';</span> <span class="s2"> """</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="bp">self</span><span class="o">=</span><span class="bp">self</span><span class="p">)</span> <span class="k">return</span> <span class="nb">super</span><span class="p">(</span><span class="n">MetastorePartitionSensor</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="n">poke</span><span class="p">(</span><span class="n">context</span><span class="p">)</span></div> @@ -338,13 +371,13 @@ <span class="bp">self</span><span class="p">,</span> <span class="n">external_dag_id</span><span class="p">,</span> <span class="n">external_task_id</span><span class="p">,</span> - <span class="n">allowed_states</span><span class="o">=</span><span class="bp">None</span><span class="p">,</span> - <span class="n">execution_delta</span><span class="o">=</span><span class="bp">None</span><span class="p">,</span> - <span class="n">execution_date_fn</span><span class="o">=</span><span class="bp">None</span><span class="p">,</span> + <span class="n">allowed_states</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> + <span class="n">execution_delta</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> + <span class="n">execution_date_fn</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span> <span class="nb">super</span><span class="p">(</span><span class="n">ExternalTaskSensor</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="n">__init__</span><span class="p">(</span><span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span> <span class="bp">self</span><span class="o">.</span><span class="n">allowed_states</span> <span class="o">=</span> <span class="n">allowed_states</span> <span class="ow">or</span> <span class="p">[</span><span class="n">State</span><span class="o">.</span><span class="n">SUCCESS</span><span class="p">]</span> - <span class="k">if</span> <span class="n">execution_delta</span> <span class="ow">is</span> <span class="ow">not</span> <span class="bp">None</span> <span class="ow">and</span> <span class="n">execution_date_fn</span> <span class="ow">is</span> <span class="ow">not</span> <span class="bp">None</span><span class="p">:</span> + <span class="k">if</span> <span class="n">execution_delta</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span> <span class="ow">and</span> <span class="n">execution_date_fn</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span> <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span> <span class="s1">'Only one of `execution_date` or `execution_date_fn` may'</span> <span class="s1">'be provided to ExternalTaskSensor; not both.'</span><span class="p">)</span> @@ -364,9 +397,9 @@ <span class="n">logging</span><span class="o">.</span><span class="n">info</span><span class="p">(</span> <span class="s1">'Poking for '</span> - <span class="s1">'{self.external_dag_id}.'</span> - <span class="s1">'{self.external_task_id} on '</span> - <span class="s1">'{dttm} ... '</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="o">**</span><span class="nb">locals</span><span class="p">()))</span> + <span class="s1">'</span><span class="si">{self.external_dag_id}</span><span class="s1">.'</span> + <span class="s1">'</span><span class="si">{self.external_task_id}</span><span class="s1"> on '</span> + <span class="s1">'</span><span class="si">{dttm}</span><span class="s1"> ... '</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="o">**</span><span class="nb">locals</span><span class="p">()))</span> <span class="n">TI</span> <span class="o">=</span> <span class="n">TaskInstance</span> <span class="n">session</span> <span class="o">=</span> <span class="n">settings</span><span class="o">.</span><span class="n">Session</span><span class="p">()</span> @@ -406,22 +439,23 @@ <span class="bp">self</span><span class="p">,</span> <span class="n">partition_names</span><span class="p">,</span> <span class="n">metastore_conn_id</span><span class="o">=</span><span class="s1">'metastore_default'</span><span class="p">,</span> - <span class="n">poke_interval</span><span class="o">=</span><span class="mi">60</span><span class="o">*</span><span class="mi">3</span><span class="p">,</span> + <span class="n">poke_interval</span><span class="o">=</span><span class="mi">60</span> <span class="o">*</span> <span class="mi">3</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span> <span class="nb">super</span><span class="p">(</span><span class="n">NamedHivePartitionSensor</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="n">__init__</span><span class="p">(</span> <span class="n">poke_interval</span><span class="o">=</span><span class="n">poke_interval</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span> - <span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">partition_names</span><span class="p">,</span> <span class="nb">basestring</span><span class="p">):</span> + <span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">partition_names</span><span class="p">,</span> <span class="n">basestring</span><span class="p">):</span> <span class="k">raise</span> <span class="ne">TypeError</span><span class="p">(</span><span class="s1">'partition_names must be an array of strings'</span><span class="p">)</span> <span class="bp">self</span><span class="o">.</span><span class="n">metastore_conn_id</span> <span class="o">=</span> <span class="n">metastore_conn_id</span> <span class="bp">self</span><span class="o">.</span><span class="n">partition_names</span> <span class="o">=</span> <span class="n">partition_names</span> <span class="bp">self</span><span class="o">.</span><span class="n">next_poke_idx</span> <span class="o">=</span> <span class="mi">0</span> + <span class="nd">@classmethod</span> <span class="k">def</span> <span class="nf">parse_partition_name</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">partition</span><span class="p">):</span> <span class="k">try</span><span class="p">:</span> - <span class="n">schema</span><span class="p">,</span> <span class="n">table_partition</span> <span class="o">=</span> <span class="n">partition</span><span class="o">.</span><span class="n">split</span><span class="p">(</span><span class="s1">'.'</span><span class="p">)</span> + <span class="n">schema</span><span class="p">,</span> <span class="n">table_partition</span> <span class="o">=</span> <span class="n">partition</span><span class="o">.</span><span class="n">split</span><span class="p">(</span><span class="s1">'.'</span><span class="p">,</span> <span class="mi">1</span><span class="p">)</span> <span class="n">table</span><span class="p">,</span> <span class="n">partition</span> <span class="o">=</span> <span class="n">table_partition</span><span class="o">.</span><span class="n">split</span><span class="p">(</span><span class="s1">'/'</span><span class="p">,</span> <span class="mi">1</span><span class="p">)</span> <span class="k">return</span> <span class="n">schema</span><span class="p">,</span> <span class="n">table</span><span class="p">,</span> <span class="n">partition</span> <span class="k">except</span> <span class="ne">ValueError</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span> @@ -430,7 +464,7 @@ <span class="k">def</span> <span class="nf">poke</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">context</span><span class="p">):</span> <span class="k">if</span> <span class="ow">not</span> <span class="nb">hasattr</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="s1">'hook'</span><span class="p">):</span> - <span class="bp">self</span><span class="o">.</span><span class="n">hook</span> <span class="o">=</span> <span class="n">airflow</span><span class="o">.</span><span class="n">hooks</span><span class="o">.</span><span class="n">hive_hooks</span><span class="o">.</span><span class="n">HiveMetastoreHook</span><span class="p">(</span> + <span class="bp">self</span><span class="o">.</span><span class="n">hook</span> <span class="o">=</span> <span class="n">hooks</span><span class="o">.</span><span class="n">HiveMetastoreHook</span><span class="p">(</span> <span class="n">metastore_conn_id</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">metastore_conn_id</span><span class="p">)</span> <span class="k">def</span> <span class="nf">poke_partition</span><span class="p">(</span><span class="n">partition</span><span class="p">):</span> @@ -438,7 +472,7 @@ <span class="n">schema</span><span class="p">,</span> <span class="n">table</span><span class="p">,</span> <span class="n">partition</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">parse_partition_name</span><span class="p">(</span><span class="n">partition</span><span class="p">)</span> <span class="n">logging</span><span class="o">.</span><span class="n">info</span><span class="p">(</span> - <span class="s1">'Poking for {schema}.{table}/{partition}'</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="o">**</span><span class="nb">locals</span><span class="p">())</span> + <span class="s1">'Poking for </span><span class="si">{schema}</span><span class="s1">.</span><span class="si">{table}</span><span class="s1">/</span><span class="si">{partition}</span><span class="s1">'</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="o">**</span><span class="nb">locals</span><span class="p">())</span> <span class="p">)</span> <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">hook</span><span class="o">.</span><span class="n">check_for_named_partition</span><span class="p">(</span> <span class="n">schema</span><span class="p">,</span> <span class="n">table</span><span class="p">,</span> <span class="n">partition</span><span class="p">)</span> @@ -447,9 +481,9 @@ <span class="k">if</span> <span class="n">poke_partition</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">partition_names</span><span class="p">[</span><span class="bp">self</span><span class="o">.</span><span class="n">next_poke_idx</span><span class="p">]):</span> <span class="bp">self</span><span class="o">.</span><span class="n">next_poke_idx</span> <span class="o">+=</span> <span class="mi">1</span> <span class="k">else</span><span class="p">:</span> - <span class="k">return</span> <span class="bp">False</span> + <span class="k">return</span> <span class="kc">False</span> - <span class="k">return</span> <span class="bp">True</span></div> + <span class="k">return</span> <span class="kc">True</span></div> <div class="viewcode-block" id="HivePartitionSensor"><a class="viewcode-back" href="../code.html#airflow.operators.HivePartitionSensor">[docs]</a><span class="k">class</span> <span class="nc">HivePartitionSensor</span><span class="p">(</span><span class="n">BaseSensorOperator</span><span class="p">):</span> @@ -496,10 +530,10 @@ <span class="k">if</span> <span class="s1">'.'</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">table</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">schema</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">table</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">table</span><span class="o">.</span><span class="n">split</span><span class="p">(</span><span class="s1">'.'</span><span class="p">)</span> <span class="n">logging</span><span class="o">.</span><span class="n">info</span><span class="p">(</span> - <span class="s1">'Poking for table {self.schema}.{self.table}, '</span> - <span class="s1">'partition {self.partition}'</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="o">**</span><span class="nb">locals</span><span class="p">()))</span> + <span class="s1">'Poking for table </span><span class="si">{self.schema}</span><span class="s1">.</span><span class="si">{self.table}</span><span class="s1">, '</span> + <span class="s1">'partition </span><span class="si">{self.partition}</span><span class="s1">'</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="o">**</span><span class="nb">locals</span><span class="p">()))</span> <span class="k">if</span> <span class="ow">not</span> <span class="nb">hasattr</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="s1">'hook'</span><span class="p">):</span> - <span class="bp">self</span><span class="o">.</span><span class="n">hook</span> <span class="o">=</span> <span class="n">airflow</span><span class="o">.</span><span class="n">hooks</span><span class="o">.</span><span class="n">hive_hooks</span><span class="o">.</span><span class="n">HiveMetastoreHook</span><span class="p">(</span> + <span class="bp">self</span><span class="o">.</span><span class="n">hook</span> <span class="o">=</span> <span class="n">hooks</span><span class="o">.</span><span class="n">HiveMetastoreHook</span><span class="p">(</span> <span class="n">metastore_conn_id</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">metastore_conn_id</span><span class="p">)</span> <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">hook</span><span class="o">.</span><span class="n">check_for_partition</span><span class="p">(</span> <span class="bp">self</span><span class="o">.</span><span class="n">schema</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">table</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">partition</span><span class="p">)</span></div> @@ -510,29 +544,77 @@ <span class="sd"> Waits for a file or folder to land in HDFS</span> <span class="sd"> """</span> <span class="n">template_fields</span> <span class="o">=</span> <span class="p">(</span><span class="s1">'filepath'</span><span class="p">,)</span> - <span class="n">ui_color</span> <span class="o">=</span> <span class="s1">'#4d9de0'</span> + <span class="n">ui_color</span> <span class="o">=</span> <span class="n">settings</span><span class="o">.</span><span class="n">WEB_COLORS</span><span class="p">[</span><span class="s1">'LIGHTBLUE'</span><span class="p">]</span> <span class="nd">@apply_defaults</span> <span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span> <span class="bp">self</span><span class="p">,</span> <span class="n">filepath</span><span class="p">,</span> <span class="n">hdfs_conn_id</span><span class="o">=</span><span class="s1">'hdfs_default'</span><span class="p">,</span> + <span class="n">ignored_ext</span><span class="o">=</span><span class="p">[</span><span class="s1">'_COPYING_'</span><span class="p">],</span> + <span class="n">ignore_copying</span><span class="o">=</span><span class="kc">True</span><span class="p">,</span> + <span class="n">file_size</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> + <span class="n">hook</span><span class="o">=</span><span class="n">HDFSHook</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span> <span class="nb">super</span><span class="p">(</span><span class="n">HdfsSensor</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="n">__init__</span><span class="p">(</span><span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span> <span class="bp">self</span><span class="o">.</span><span class="n">filepath</span> <span class="o">=</span> <span class="n">filepath</span> <span class="bp">self</span><span class="o">.</span><span class="n">hdfs_conn_id</span> <span class="o">=</span> <span class="n">hdfs_conn_id</span> + <span class="bp">self</span><span class="o">.</span><span class="n">file_size</span> <span class="o">=</span> <span class="n">file_size</span> + <span class="bp">self</span><span class="o">.</span><span class="n">ignored_ext</span> <span class="o">=</span> <span class="n">ignored_ext</span> + <span class="bp">self</span><span class="o">.</span><span class="n">ignore_copying</span> <span class="o">=</span> <span class="n">ignore_copying</span> + <span class="bp">self</span><span class="o">.</span><span class="n">hook</span> <span class="o">=</span> <span class="n">hook</span> + + <span class="nd">@staticmethod</span> +<div class="viewcode-block" id="HdfsSensor.filter_for_filesize"><a class="viewcode-back" href="../code.html#airflow.operators.HdfsSensor.filter_for_filesize">[docs]</a> <span class="k">def</span> <span class="nf">filter_for_filesize</span><span class="p">(</span><span class="n">result</span><span class="p">,</span> <span class="n">size</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span> + <span class="sd">"""</span> +<span class="sd"> Will test the filepath result and test if its size is at least self.filesize</span> +<span class="sd"> :param result: a list of dicts returned by Snakebite ls</span> +<span class="sd"> :param size: the file size in MB a file should be at least to trigger True</span> +<span class="sd"> :return: (bool) depending on the matching criteria</span> +<span class="sd"> """</span> + <span class="k">if</span> <span class="n">size</span><span class="p">:</span> + <span class="n">logging</span><span class="o">.</span><span class="n">debug</span><span class="p">(</span><span class="s1">'Filtering for file size >= </span><span class="si">%s</span><span class="s1"> in files: </span><span class="si">%s</span><span class="s1">'</span><span class="p">,</span> <span class="n">size</span><span class="p">,</span> <span class="nb">map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">x</span><span class="p">:</span> <span class="n">x</span><span class="p">[</span><span class="s1">'path'</span><span class="p">],</span> <span class="n">result</span><span class="p">))</span> + <span class="n">size</span> <span class="o">*=</span> <span class="n">settings</span><span class="o">.</span><span class="n">MEGABYTE</span> + <span class="n">result</span> <span class="o">=</span> <span class="p">[</span><span class="n">x</span> <span class="k">for</span> <span class="n">x</span> <span class="ow">in</span> <span class="n">result</span> <span class="k">if</span> <span class="n">x</span><span class="p">[</span><span class="s1">'length'</span><span class="p">]</span> <span class="o">>=</span> <span class="n">size</span><span class="p">]</span> + <span class="n">logging</span><span class="o">.</span><span class="n">debug</span><span class="p">(</span><span class="s1">'HdfsSensor.poke: after size filter result is </span><span class="si">%s</span><span class="s1">'</span><span class="p">,</span> <span class="n">result</span><span class="p">)</span> + <span class="k">return</span> <span class="n">result</span></div> + + <span class="nd">@staticmethod</span> +<div class="viewcode-block" id="HdfsSensor.filter_for_ignored_ext"><a class="viewcode-back" href="../code.html#airflow.operators.HdfsSensor.filter_for_ignored_ext">[docs]</a> <span class="k">def</span> <span class="nf">filter_for_ignored_ext</span><span class="p">(</span><span class="n">result</span><span class="p">,</span> <span class="n">ignored_ext</span><span class="p">,</span> <span class="n">ignore_copying</span><span class="p">):</span> + <span class="sd">"""</span> +<span class="sd"> Will filter if instructed to do so the result to remove matching criteria</span> +<span class="sd"> :param result: (list) of dicts returned by Snakebite ls</span> +<span class="sd"> :param ignored_ext: (list) of ignored extentions</span> +<span class="sd"> :param ignore_copying: (bool) shall we ignore ?</span> +<span class="sd"> :return:</span> +<span class="sd"> """</span> + <span class="k">if</span> <span class="n">ignore_copying</span><span class="p">:</span> + <span class="n">regex_builder</span> <span class="o">=</span> <span class="s2">"^.*\.(</span><span class="si">%s</span><span class="s2">$)$"</span> <span class="o">%</span> <span class="s1">'$|'</span><span class="o">.</span><span class="n">join</span><span class="p">(</span><span class="n">ignored_ext</span><span class="p">)</span> + <span class="n">ignored_extentions_regex</span> <span class="o">=</span> <span class="n">re</span><span class="o">.</span><span class="n">compile</span><span class="p">(</span><span class="n">regex_builder</span><span class="p">)</span> + <span class="n">logging</span><span class="o">.</span><span class="n">debug</span><span class="p">(</span><span class="s1">'Filtering result for ignored extentions: </span><span class="si">%s</span><span class="s1"> in files </span><span class="si">%s</span><span class="s1">'</span><span class="p">,</span> <span class="n">ignored_extentions_regex</span><span class="o">.</span><span class="n">pattern</span><span class="p">,</span> + <span class="nb">map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">x</span><span class="p">:</span> <span class="n">x</span><span class="p">[</span><span class="s1">'path'</span><span class="p">],</span> <span class="n">result</span><span class="p">))</span> + <span class="n">result</span> <span class="o">=</span> <span class="p">[</span><span class="n">x</span> <span class="k">for</span> <span class="n">x</span> <span class="ow">in</span> <span class="n">result</span> <span class="k">if</span> <span class="ow">not</span> <span class="n">ignored_extentions_regex</span><span class="o">.</span><span class="n">match</span><span class="p">(</span><span class="n">x</span><span class="p">[</span><span class="s1">'path'</span><span class="p">])]</span> + <span class="n">logging</span><span class="o">.</span><span class="n">debug</span><span class="p">(</span><span class="s1">'HdfsSensor.poke: after ext filter result is </span><span class="si">%s</span><span class="s1">'</span><span class="p">,</span> <span class="n">result</span><span class="p">)</span> + <span class="k">return</span> <span class="n">result</span></div> <span class="k">def</span> <span class="nf">poke</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">context</span><span class="p">):</span> - <span class="kn">import</span> <span class="nn">airflow.hooks.hdfs_hook</span> - <span class="n">sb</span> <span class="o">=</span> <span class="n">airflow</span><span class="o">.</span><span class="n">hooks</span><span class="o">.</span><span class="n">hdfs_hook</span><span class="o">.</span><span class="n">HDFSHook</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">hdfs_conn_id</span><span class="p">)</span><span class="o">.</span><span class="n">get_conn</span><span class="p">()</span> + <span class="n">sb</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">hook</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">hdfs_conn_id</span><span class="p">)</span><span class="o">.</span><span class="n">get_conn</span><span class="p">()</span> <span class="n">logging</span><span class="o">.</span><span class="n">getLogger</span><span class="p">(</span><span class="s2">"snakebite"</span><span class="p">)</span><span class="o">.</span><span class="n">setLevel</span><span class="p">(</span><span class="n">logging</span><span class="o">.</span><span class="n">WARNING</span><span class="p">)</span> - <span class="n">logging</span><span class="o">.</span><span class="n">info</span><span class="p">(</span> - <span class="s1">'Poking for file {self.filepath} '</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="o">**</span><span class="nb">locals</span><span class="p">()))</span> + <span class="n">logging</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s1">'Poking for file </span><span class="si">{self.filepath}</span><span class="s1"> '</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="o">**</span><span class="nb">locals</span><span class="p">()))</span> <span class="k">try</span><span class="p">:</span> - <span class="n">files</span> <span class="o">=</span> <span class="p">[</span><span class="n">f</span> <span class="k">for</span> <span class="n">f</span> <span class="ow">in</span> <span class="n">sb</span><span class="o">.</span><span class="n">ls</span><span class="p">([</span><span class="bp">self</span><span class="o">.</span><span class="n">filepath</span><span class="p">])]</span> + <span class="c1"># IMOO it's not right here, as there no raise of any kind.</span> + <span class="c1"># if the filepath is let's say '/data/mydirectory', it's correct but if it is '/data/mydirectory/*',</span> + <span class="c1"># it's not correct as the directory exists and sb does not raise any error</span> + <span class="c1"># here is a quick fix</span> + <span class="n">result</span> <span class="o">=</span> <span class="p">[</span><span class="n">f</span> <span class="k">for</span> <span class="n">f</span> <span class="ow">in</span> <span class="n">sb</span><span class="o">.</span><span class="n">ls</span><span class="p">([</span><span class="bp">self</span><span class="o">.</span><span class="n">filepath</span><span class="p">],</span> <span class="n">include_toplevel</span><span class="o">=</span><span class="kc">False</span><span class="p">)]</span> + <span class="n">logging</span><span class="o">.</span><span class="n">debug</span><span class="p">(</span><span class="s1">'HdfsSensor.poke: result is </span><span class="si">%s</span><span class="s1">'</span><span class="p">,</span> <span class="n">result</span><span class="p">)</span> + <span class="n">result</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">filter_for_ignored_ext</span><span class="p">(</span><span class="n">result</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">ignored_ext</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">ignore_copying</span><span class="p">)</span> + <span class="n">result</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">filter_for_filesize</span><span class="p">(</span><span class="n">result</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">file_size</span><span class="p">)</span> + <span class="k">return</span> <span class="nb">bool</span><span class="p">(</span><span class="n">result</span><span class="p">)</span> <span class="k">except</span><span class="p">:</span> - <span class="k">return</span> <span class="bp">False</span> - <span class="k">return</span> <span class="bp">True</span></div> + <span class="n">e</span> <span class="o">=</span> <span class="n">sys</span><span class="o">.</span><span class="n">exc_info</span><span class="p">()</span> + <span class="n">logging</span><span class="o">.</span><span class="n">debug</span><span class="p">(</span><span class="s2">"Caught an exception !: </span><span class="si">%s</span><span class="s2">"</span><span class="p">,</span> <span class="nb">str</span><span class="p">(</span><span class="n">e</span><span class="p">))</span> + <span class="k">return</span> <span class="kc">False</span></div> <div class="viewcode-block" id="WebHdfsSensor"><a class="viewcode-back" href="../code.html#airflow.operators.WebHdfsSensor">[docs]</a><span class="k">class</span> <span class="nc">WebHdfsSensor</span><span class="p">(</span><span class="n">BaseSensorOperator</span><span class="p">):</span> @@ -554,7 +636,7 @@ <span class="k">def</span> <span class="nf">poke</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">context</span><span class="p">):</span> <span class="n">c</span> <span class="o">=</span> <span class="n">airflow</span><span class="o">.</span><span class="n">hooks</span><span class="o">.</span><span class="n">webhdfs_hook</span><span class="o">.</span><span class="n">WebHDFSHook</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">webhdfs_conn_id</span><span class="p">)</span> <span class="n">logging</span><span class="o">.</span><span class="n">info</span><span class="p">(</span> - <span class="s1">'Poking for file {self.filepath} '</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="o">**</span><span class="nb">locals</span><span class="p">()))</span> + <span class="s1">'Poking for file </span><span class="si">{self.filepath}</span><span class="s1"> '</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="o">**</span><span class="nb">locals</span><span class="p">()))</span> <span class="k">return</span> <span class="n">c</span><span class="o">.</span><span class="n">check_for_path</span><span class="p">(</span><span class="n">hdfs_path</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">filepath</span><span class="p">)</span></div> @@ -580,17 +662,13 @@ <span class="nd">@apply_defaults</span> <span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span> <span class="bp">self</span><span class="p">,</span> <span class="n">bucket_key</span><span class="p">,</span> - <span class="n">bucket_name</span><span class="o">=</span><span class="bp">None</span><span class="p">,</span> - <span class="n">wildcard_match</span><span class="o">=</span><span class="bp">False</span><span class="p">,</span> + <span class="n">bucket_name</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> + <span class="n">wildcard_match</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> <span class="n">s3_conn_id</span><span class="o">=</span><span class="s1">'s3_default'</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span> <span class="nb">super</span><span class="p">(</span><span class="n">S3KeySensor</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="n">__init__</span><span class="p">(</span><span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span> - <span class="n">session</span> <span class="o">=</span> <span class="n">settings</span><span class="o">.</span><span class="n">Session</span><span class="p">()</span> - <span class="n">db</span> <span class="o">=</span> <span class="n">session</span><span class="o">.</span><span class="n">query</span><span class="p">(</span><span class="n">DB</span><span class="p">)</span><span class="o">.</span><span class="n">filter</span><span class="p">(</span><span class="n">DB</span><span class="o">.</span><span class="n">conn_id</span> <span class="o">==</span> <span class="n">s3_conn_id</span><span class="p">)</span><span class="o">.</span><span class="n">first</span><span class="p">()</span> - <span class="k">if</span> <span class="ow">not</span> <span class="n">db</span><span class="p">:</span> - <span class="k">raise</span> <span class="n">AirflowException</span><span class="p">(</span><span class="s2">"conn_id doesn't exist in the repository"</span><span class="p">)</span> <span class="c1"># Parse</span> - <span class="k">if</span> <span class="n">bucket_name</span> <span class="ow">is</span> <span class="bp">None</span><span class="p">:</span> + <span class="k">if</span> <span class="n">bucket_name</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span> <span class="n">parsed_url</span> <span class="o">=</span> <span class="n">urlparse</span><span class="p">(</span><span class="n">bucket_key</span><span class="p">)</span> <span class="k">if</span> <span class="n">parsed_url</span><span class="o">.</span><span class="n">netloc</span> <span class="o">==</span> <span class="s1">''</span><span class="p">:</span> <span class="k">raise</span> <span class="n">AirflowException</span><span class="p">(</span><span class="s1">'Please provide a bucket_name'</span><span class="p">)</span> @@ -604,14 +682,12 @@ <span class="bp">self</span><span class="o">.</span><span class="n">bucket_key</span> <span class="o">=</span> <span class="n">bucket_key</span> <span class="bp">self</span><span class="o">.</span><span class="n">wildcard_match</span> <span class="o">=</span> <span class="n">wildcard_match</span> <span class="bp">self</span><span class="o">.</span><span class="n">s3_conn_id</span> <span class="o">=</span> <span class="n">s3_conn_id</span> - <span class="n">session</span><span class="o">.</span><span class="n">commit</span><span class="p">()</span> - <span class="n">session</span><span class="o">.</span><span class="n">close</span><span class="p">()</span> <span class="k">def</span> <span class="nf">poke</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">context</span><span class="p">):</span> <span class="kn">import</span> <span class="nn">airflow.hooks.S3_hook</span> <span class="n">hook</span> <span class="o">=</span> <span class="n">airflow</span><span class="o">.</span><span class="n">hooks</span><span class="o">.</span><span class="n">S3_hook</span><span class="o">.</span><span class="n">S3Hook</span><span class="p">(</span><span class="n">s3_conn_id</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">s3_conn_id</span><span class="p">)</span> <span class="n">full_url</span> <span class="o">=</span> <span class="s2">"s3://"</span> <span class="o">+</span> <span class="bp">self</span><span class="o">.</span><span class="n">bucket_name</span> <span class="o">+</span> <span class="s2">"/"</span> <span class="o">+</span> <span class="bp">self</span><span class="o">.</span><span class="n">bucket_key</span> - <span class="n">logging</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s1">'Poking for key : {full_url}'</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="o">**</span><span class="nb">locals</span><span class="p">()))</span> + <span class="n">logging</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s1">'Poking for key : </span><span class="si">{full_url}</span><span class="s1">'</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="o">**</span><span class="nb">locals</span><span class="p">()))</span> <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">wildcard_match</span><span class="p">:</span> <span class="k">return</span> <span class="n">hook</span><span class="o">.</span><span class="n">check_for_wildcard_key</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">bucket_key</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">bucket_name</span><span class="p">)</span> @@ -645,22 +721,16 @@ <span class="n">s3_conn_id</span><span class="o">=</span><span class="s1">'s3_default'</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span> <span class="nb">super</span><span class="p">(</span><span class="n">S3PrefixSensor</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="n">__init__</span><span class="p">(</span><span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span> - <span class="n">session</span> <span class="o">=</span> <span class="n">settings</span><span class="o">.</span><span class="n">Session</span><span class="p">()</span> - <span class="n">db</span> <span class="o">=</span> <span class="n">session</span><span class="o">.</span><span class="n">query</span><span class="p">(</span><span class="n">DB</span><span class="p">)</span><span class="o">.</span><span class="n">filter</span><span class="p">(</span><span class="n">DB</span><span class="o">.</span><span class="n">conn_id</span> <span class="o">==</span> <span class="n">s3_conn_id</span><span class="p">)</span><span class="o">.</span><span class="n">first</span><span class="p">()</span> - <span class="k">if</span> <span class="ow">not</span> <span class="n">db</span><span class="p">:</span> - <span class="k">raise</span> <span class="n">AirflowException</span><span class="p">(</span><span class="s2">"conn_id doesn't exist in the repository"</span><span class="p">)</span> <span class="c1"># Parse</span> <span class="bp">self</span><span class="o">.</span><span class="n">bucket_name</span> <span class="o">=</span> <span class="n">bucket_name</span> <span class="bp">self</span><span class="o">.</span><span class="n">prefix</span> <span class="o">=</span> <span class="n">prefix</span> <span class="bp">self</span><span class="o">.</span><span class="n">delimiter</span> <span class="o">=</span> <span class="n">delimiter</span> <span class="bp">self</span><span class="o">.</span><span class="n">full_url</span> <span class="o">=</span> <span class="s2">"s3://"</span> <span class="o">+</span> <span class="n">bucket_name</span> <span class="o">+</span> <span class="s1">'/'</span> <span class="o">+</span> <span class="n">prefix</span> <span class="bp">self</span><span class="o">.</span><span class="n">s3_conn_id</span> <span class="o">=</span> <span class="n">s3_conn_id</span> - <span class="n">session</span><span class="o">.</span><span class="n">commit</span><span class="p">()</span> - <span class="n">session</span><span class="o">.</span><span class="n">close</span><span class="p">()</span> <span class="k">def</span> <span class="nf">poke</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">context</span><span class="p">):</span> - <span class="n">logging</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s1">'Poking for prefix : {self.prefix}</span><span class="se">\n</span><span class="s1">'</span> - <span class="s1">'in bucket s3://{self.bucket_name}'</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="o">**</span><span class="nb">locals</span><span class="p">()))</span> + <span class="n">logging</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s1">'Poking for prefix : </span><span class="si">{self.prefix}</span><span class="se">\n</span><span class="s1">'</span> + <span class="s1">'in bucket s3://</span><span class="si">{self.bucket_name}</span><span class="s1">'</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="o">**</span><span class="nb">locals</span><span class="p">()))</span> <span class="kn">import</span> <span class="nn">airflow.hooks.S3_hook</span> <span class="n">hook</span> <span class="o">=</span> <span class="n">airflow</span><span class="o">.</span><span class="n">hooks</span><span class="o">.</span><span class="n">S3_hook</span><span class="o">.</span><span class="n">S3Hook</span><span class="p">(</span><span class="n">s3_conn_id</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">s3_conn_id</span><span class="p">)</span> <span class="k">return</span> <span class="n">hook</span><span class="o">.</span><span class="n">check_for_prefix</span><span class="p">(</span> @@ -685,7 +755,7 @@ <span class="k">def</span> <span class="nf">poke</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">context</span><span class="p">):</span> <span class="n">logging</span><span class="o">.</span><span class="n">info</span><span class="p">(</span> - <span class="s1">'Checking if the time ({0}) has come'</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">target_time</span><span class="p">))</span> + <span class="s1">'Checking if the time (</span><span class="si">{0}</span><span class="s1">) has come'</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">target_time</span><span class="p">))</span> <span class="k">return</span> <span class="n">datetime</span><span class="o">.</span><span class="n">now</span><span class="p">()</span><span class="o">.</span><span class="n">time</span><span class="p">()</span> <span class="o">></span> <span class="bp">self</span><span class="o">.</span><span class="n">target_time</span></div> @@ -710,7 +780,7 @@ <span class="n">dag</span> <span class="o">=</span> <span class="n">context</span><span class="p">[</span><span class="s1">'dag'</span><span class="p">]</span> <span class="n">target_dttm</span> <span class="o">=</span> <span class="n">dag</span><span class="o">.</span><span class="n">following_schedule</span><span class="p">(</span><span class="n">context</span><span class="p">[</span><span class="s1">'execution_date'</span><span class="p">])</span> <span class="n">target_dttm</span> <span class="o">+=</span> <span class="bp">self</span><span class="o">.</span><span class="n">delta</span> - <span class="n">logging</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s1">'Checking if the time ({0}) has come'</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">target_dttm</span><span class="p">))</span> + <span class="n">logging</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s1">'Checking if the time (</span><span class="si">{0}</span><span class="s1">) has come'</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">target_dttm</span><span class="p">))</span> <span class="k">return</span> <span class="n">datetime</span><span class="o">.</span><span class="n">now</span><span class="p">()</span> <span class="o">></span> <span class="n">target_dttm</span> @@ -736,16 +806,16 @@ <span class="sd"> depends on the option that's being modified.</span> <span class="sd"> """</span> - <span class="n">template_fields</span> <span class="o">=</span> <span class="p">(</span><span class="s1">'endpoint'</span><span class="p">,)</span> + <span class="n">template_fields</span> <span class="o">=</span> <span class="p">(</span><span class="s1">'endpoint'</span><span class="p">,</span> <span class="s1">'params'</span><span class="p">)</span> <span class="nd">@apply_defaults</span> <span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">endpoint</span><span class="p">,</span> <span class="n">http_conn_id</span><span class="o">=</span><span class="s1">'http_default'</span><span class="p">,</span> - <span class="n">params</span><span class="o">=</span><span class="bp">None</span><span class="p">,</span> - <span class="n">headers</span><span class="o">=</span><span class="bp">None</span><span class="p">,</span> - <span class="n">response_check</span><span class="o">=</span><span class="bp">None</span><span class="p">,</span> - <span class="n">extra_options</span><span class="o">=</span><span class="bp">None</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span> + <span class="n">params</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> + <span class="n">headers</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> + <span class="n">response_check</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> + <span class="n">extra_options</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span> <span class="nb">super</span><span class="p">(</span><span class="n">HttpSensor</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="n">__init__</span><span class="p">(</span><span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span> <span class="bp">self</span><span class="o">.</span><span class="n">endpoint</span> <span class="o">=</span> <span class="n">endpoint</span> <span class="bp">self</span><span class="o">.</span><span class="n">http_conn_id</span> <span class="o">=</span> <span class="n">http_conn_id</span> @@ -768,14 +838,17 @@ <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">response_check</span><span class="p">(</span><span class="n">response</span><span class="p">)</span> <span class="k">except</span> <span class="n">AirflowException</span> <span class="k">as</span> <span class="n">ae</span><span class="p">:</span> <span class="k">if</span> <span class="nb">str</span><span class="p">(</span><span class="n">ae</span><span class="p">)</span><span class="o">.</span><span class="n">startswith</span><span class="p">(</span><span class="s2">"404"</span><span class="p">):</span> - <span class="k">return</span> <span class="bp">False</span> + <span class="k">return</span> <span class="kc">False</span> <span class="k">raise</span> <span class="n">ae</span> - <span class="k">return</span> <span class="bp">True</span></div> + <span class="k">return</span> <span class="kc">True</span></div> </pre></div> </div> + <div class="articleComments"> + + </div> </div> <footer> @@ -808,7 +881,8 @@ VERSION:'', COLLAPSE_INDEX:false, FILE_SUFFIX:'.html', - HAS_SOURCE: true + HAS_SOURCE: true, + SOURCELINK_SUFFIX: '.txt' }; </script> <script type="text/javascript" src="../_static/jquery.js"></script>
http://git-wip-us.apache.org/repos/asf/incubator-airflow-site/blob/9c75ee9e/_modules/sqlite_hook.html ---------------------------------------------------------------------- diff --git a/_modules/sqlite_hook.html b/_modules/sqlite_hook.html index 70abb8c..dc33f36 100644 --- a/_modules/sqlite_hook.html +++ b/_modules/sqlite_hook.html @@ -30,6 +30,9 @@ + <link rel="index" title="Index" + href="../genindex.html"/> + <link rel="search" title="Search" href="../search.html"/> <link rel="top" title="Airflow Documentation" href="../index.html"/> <link rel="up" title="Module code" href="index.html"/> @@ -40,6 +43,7 @@ <body class="wy-body-for-nav" role="document"> + <div class="wy-grid-for-nav"> @@ -90,6 +94,8 @@ <li class="toctree-l1"><a class="reference internal" href="../scheduler.html">Scheduling & Triggers</a></li> <li class="toctree-l1"><a class="reference internal" href="../plugins.html">Plugins</a></li> <li class="toctree-l1"><a class="reference internal" href="../security.html">Security</a></li> +<li class="toctree-l1"><a class="reference internal" href="../api.html">Experimental Rest API</a></li> +<li class="toctree-l1"><a class="reference internal" href="../integration.html">Integration</a></li> <li class="toctree-l1"><a class="reference internal" href="../faq.html">FAQ</a></li> <li class="toctree-l1"><a class="reference internal" href="../code.html">API Reference</a></li> </ul> @@ -104,8 +110,10 @@ <nav class="wy-nav-top" role="navigation" aria-label="top navigation"> - <i data-toggle="wy-nav-top" class="fa fa-bars"></i> - <a href="../index.html">Airflow</a> + + <i data-toggle="wy-nav-top" class="fa fa-bars"></i> + <a href="../index.html">Airflow</a> + </nav> @@ -118,19 +126,36 @@ + + + + + + + + + + <div role="navigation" aria-label="breadcrumbs navigation"> + <ul class="wy-breadcrumbs"> - <li><a href="../index.html">Docs</a> »</li> - + + <li><a href="../index.html">Docs</a> »</li> + <li><a href="index.html">Module code</a> »</li> - - <li>sqlite_hook</li> + + <li>sqlite_hook</li> + + <li class="wy-breadcrumbs-aside"> - + </li> + </ul> + + <hr/> </div> <div role="main" class="document" itemscope="itemscope" itemtype="http://schema.org/Article"> @@ -153,7 +178,7 @@ <span class="kn">import</span> <span class="nn">sqlite3</span> -<span class="kn">from</span> <span class="nn">airflow.hooks.dbapi_hook</span> <span class="kn">import</span> <span class="n">DbApiHook</span> +<span class="kn">from</span> <span class="nn">airflow.hooks.dbapi_hook</span> <span class="k">import</span> <span class="n">DbApiHook</span> <div class="viewcode-block" id="SqliteHook"><a class="viewcode-back" href="../code.html#airflow.hooks.SqliteHook">[docs]</a><span class="k">class</span> <span class="nc">SqliteHook</span><span class="p">(</span><span class="n">DbApiHook</span><span class="p">):</span> @@ -164,7 +189,7 @@ <span class="n">conn_name_attr</span> <span class="o">=</span> <span class="s1">'sqlite_conn_id'</span> <span class="n">default_conn_name</span> <span class="o">=</span> <span class="s1">'sqlite_default'</span> - <span class="n">supports_autocommit</span> <span class="o">=</span> <span class="bp">False</span> + <span class="n">supports_autocommit</span> <span class="o">=</span> <span class="kc">False</span> <div class="viewcode-block" id="SqliteHook.get_conn"><a class="viewcode-back" href="../code.html#airflow.hooks.SqliteHook.get_conn">[docs]</a> <span class="k">def</span> <span class="nf">get_conn</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> <span class="sd">"""</span> @@ -176,6 +201,9 @@ </pre></div> </div> + <div class="articleComments"> + + </div> </div> <footer> @@ -208,7 +236,8 @@ VERSION:'', COLLAPSE_INDEX:false, FILE_SUFFIX:'.html', - HAS_SOURCE: true + HAS_SOURCE: true, + SOURCELINK_SUFFIX: '.txt' }; </script> <script type="text/javascript" src="../_static/jquery.js"></script> http://git-wip-us.apache.org/repos/asf/incubator-airflow-site/blob/9c75ee9e/_modules/ssh_execute_operator.html ---------------------------------------------------------------------- diff --git a/_modules/ssh_execute_operator.html b/_modules/ssh_execute_operator.html index c4c952d..27834e2 100644 --- a/_modules/ssh_execute_operator.html +++ b/_modules/ssh_execute_operator.html @@ -30,6 +30,9 @@ + <link rel="index" title="Index" + href="../genindex.html"/> + <link rel="search" title="Search" href="../search.html"/> <link rel="top" title="Airflow Documentation" href="../index.html"/> <link rel="up" title="Module code" href="index.html"/> @@ -40,6 +43,7 @@ <body class="wy-body-for-nav" role="document"> + <div class="wy-grid-for-nav"> @@ -90,6 +94,8 @@ <li class="toctree-l1"><a class="reference internal" href="../scheduler.html">Scheduling & Triggers</a></li> <li class="toctree-l1"><a class="reference internal" href="../plugins.html">Plugins</a></li> <li class="toctree-l1"><a class="reference internal" href="../security.html">Security</a></li> +<li class="toctree-l1"><a class="reference internal" href="../api.html">Experimental Rest API</a></li> +<li class="toctree-l1"><a class="reference internal" href="../integration.html">Integration</a></li> <li class="toctree-l1"><a class="reference internal" href="../faq.html">FAQ</a></li> <li class="toctree-l1"><a class="reference internal" href="../code.html">API Reference</a></li> </ul> @@ -104,8 +110,10 @@ <nav class="wy-nav-top" role="navigation" aria-label="top navigation"> - <i data-toggle="wy-nav-top" class="fa fa-bars"></i> - <a href="../index.html">Airflow</a> + + <i data-toggle="wy-nav-top" class="fa fa-bars"></i> + <a href="../index.html">Airflow</a> + </nav> @@ -118,19 +126,36 @@ + + + + + + + + + + <div role="navigation" aria-label="breadcrumbs navigation"> + <ul class="wy-breadcrumbs"> - <li><a href="../index.html">Docs</a> »</li> - + + <li><a href="../index.html">Docs</a> »</li> + <li><a href="index.html">Module code</a> »</li> - - <li>ssh_execute_operator</li> + + <li>ssh_execute_operator</li> + + <li class="wy-breadcrumbs-aside"> - + </li> + </ul> + + <hr/> </div> <div role="main" class="document" itemscope="itemscope" itemtype="http://schema.org/Article"> @@ -151,14 +176,14 @@ <span class="c1"># See the License for the specific language governing permissions and</span> <span class="c1"># limitations under the License.</span> -<span class="kn">from</span> <span class="nn">builtins</span> <span class="kn">import</span> <span class="nb">bytes</span> +<span class="kn">from</span> <span class="nn">builtins</span> <span class="k">import</span> <span class="nb">bytes</span> <span class="kn">import</span> <span class="nn">logging</span> <span class="kn">import</span> <span class="nn">subprocess</span> -<span class="kn">from</span> <span class="nn">subprocess</span> <span class="kn">import</span> <span class="n">STDOUT</span> +<span class="kn">from</span> <span class="nn">subprocess</span> <span class="k">import</span> <span class="n">STDOUT</span> -<span class="kn">from</span> <span class="nn">airflow.models</span> <span class="kn">import</span> <span class="n">BaseOperator</span> -<span class="kn">from</span> <span class="nn">airflow.utils.decorators</span> <span class="kn">import</span> <span class="n">apply_defaults</span> -<span class="kn">from</span> <span class="nn">airflow.exceptions</span> <span class="kn">import</span> <span class="n">AirflowException</span> +<span class="kn">from</span> <span class="nn">airflow.models</span> <span class="k">import</span> <span class="n">BaseOperator</span> +<span class="kn">from</span> <span class="nn">airflow.utils.decorators</span> <span class="k">import</span> <span class="n">apply_defaults</span> +<span class="kn">from</span> <span class="nn">airflow.exceptions</span> <span class="k">import</span> <span class="n">AirflowException</span> <span class="k">class</span> <span class="nc">SSHTempFileContent</span><span class="p">(</span><span class="nb">object</span><span class="p">):</span> @@ -177,9 +202,9 @@ <span class="sd"> :param ssh_hook: A SSHHook that indicates a remote host</span> <span class="sd"> where you want to create tempfile</span> -<span class="sd"> :param content: Initial content of creating temprary file</span> +<span class="sd"> :param content: Initial content of creating temporary file</span> <span class="sd"> :type content: string</span> -<span class="sd"> :param prefix: The prefix string you want to use for the temprary file</span> +<span class="sd"> :param prefix: The prefix string you want to use for the temporary file</span> <span class="sd"> :type prefix: string</span> <span class="sd"> """</span> @@ -194,7 +219,7 @@ <span class="n">prefix</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_prefix</span> <span class="n">pmktemp</span> <span class="o">=</span> <span class="n">ssh_hook</span><span class="o">.</span><span class="n">Popen</span><span class="p">([</span><span class="s2">"-q"</span><span class="p">,</span> - <span class="s2">"mktemp"</span><span class="p">,</span> <span class="s2">"--tmpdir"</span><span class="p">,</span> <span class="n">prefix</span> <span class="o">+</span> <span class="s2">"_XXXXXX"</span><span class="p">],</span> + <span class="s2">"mktemp"</span><span class="p">,</span> <span class="s2">"-t"</span><span class="p">,</span> <span class="n">prefix</span> <span class="o">+</span> <span class="s2">"_XXXXXX"</span><span class="p">],</span> <span class="n">stdout</span><span class="o">=</span><span class="n">subprocess</span><span class="o">.</span><span class="n">PIPE</span><span class="p">,</span> <span class="n">stderr</span><span class="o">=</span><span class="n">STDOUT</span><span class="p">)</span> <span class="n">tempfile</span> <span class="o">=</span> <span class="n">pmktemp</span><span class="o">.</span><span class="n">communicate</span><span class="p">()[</span><span class="mi">0</span><span class="p">]</span><span class="o">.</span><span class="n">rstrip</span><span class="p">()</span> @@ -221,7 +246,7 @@ <span class="n">sp</span><span class="o">.</span><span class="n">wait</span><span class="p">()</span> <span class="k">if</span> <span class="n">sp</span><span class="o">.</span><span class="n">returncode</span><span class="p">:</span> <span class="k">raise</span> <span class="n">AirflowException</span><span class="p">(</span><span class="s2">"Failed to remove to remote temp file"</span><span class="p">)</span> - <span class="k">return</span> <span class="bp">False</span> + <span class="k">return</span> <span class="kc">False</span> <div class="viewcode-block" id="SSHExecuteOperator"><a class="viewcode-back" href="../code.html#airflow.contrib.operators.SSHExecuteOperator">[docs]</a><span class="k">class</span> <span class="nc">SSHExecuteOperator</span><span class="p">(</span><span class="n">BaseOperator</span><span class="p">):</span> @@ -248,8 +273,8 @@ <span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">ssh_hook</span><span class="p">,</span> <span class="n">bash_command</span><span class="p">,</span> - <span class="n">xcom_push</span><span class="o">=</span><span class="bp">False</span><span class="p">,</span> - <span class="n">env</span><span class="o">=</span><span class="bp">None</span><span class="p">,</span> + <span class="n">xcom_push</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> + <span class="n">env</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span> <span class="nb">super</span><span class="p">(</span><span class="n">SSHExecuteOperator</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="n">__init__</span><span class="p">(</span><span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span> <span class="bp">self</span><span class="o">.</span><span class="n">bash_command</span> <span class="o">=</span> <span class="n">bash_command</span> @@ -266,8 +291,10 @@ <span class="bp">self</span><span class="o">.</span><span class="n">bash_command</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">task_id</span><span class="p">)</span> <span class="k">as</span> <span class="n">remote_file_path</span><span class="p">:</span> <span class="n">logging</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">"Temporary script "</span> - <span class="s2">"location : {0}:{1}"</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">host</span><span class="p">,</span> <span class="n">remote_file_path</span><span class="p">))</span> + <span class="s2">"location : </span><span class="si">{0}</span><span class="s2">:</span><span class="si">{1}</span><span class="s2">"</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">host</span><span class="p">,</span> <span class="n">remote_file_path</span><span class="p">))</span> <span class="n">logging</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">"Running command: "</span> <span class="o">+</span> <span class="n">bash_command</span><span class="p">)</span> + <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">env</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span> + <span class="n">logging</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">"env: "</span> <span class="o">+</span> <span class="nb">str</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">env</span><span class="p">))</span> <span class="n">sp</span> <span class="o">=</span> <span class="n">hook</span><span class="o">.</span><span class="n">Popen</span><span class="p">(</span> <span class="p">[</span><span class="s1">'-q'</span><span class="p">,</span> <span class="s1">'bash'</span><span class="p">,</span> <span class="n">remote_file_path</span><span class="p">],</span> @@ -279,11 +306,11 @@ <span class="n">logging</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">"Output:"</span><span class="p">)</span> <span class="n">line</span> <span class="o">=</span> <span class="s1">''</span> <span class="k">for</span> <span class="n">line</span> <span class="ow">in</span> <span class="nb">iter</span><span class="p">(</span><span class="n">sp</span><span class="o">.</span><span class="n">stdout</span><span class="o">.</span><span class="n">readline</span><span class="p">,</span> <span class="n">b</span><span class="s1">''</span><span class="p">):</span> - <span class="n">line</span> <span class="o">=</span> <span class="n">line</span><span class="o">.</span><span class="n">decode</span><span class="p">()</span><span class="o">.</span><span class="n">strip</span><span class="p">()</span> + <span class="n">line</span> <span class="o">=</span> <span class="n">line</span><span class="o">.</span><span class="n">decode</span><span class="p">(</span><span class="s1">'utf_8'</span><span class="p">)</span><span class="o">.</span><span class="n">strip</span><span class="p">()</span> <span class="n">logging</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="n">line</span><span class="p">)</span> <span class="n">sp</span><span class="o">.</span><span class="n">wait</span><span class="p">()</span> <span class="n">logging</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">"Command exited with "</span> - <span class="s2">"return code {0}"</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">sp</span><span class="o">.</span><span class="n">returncode</span><span class="p">))</span> + <span class="s2">"return code </span><span class="si">{0}</span><span class="s2">"</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">sp</span><span class="o">.</span><span class="n">returncode</span><span class="p">))</span> <span class="k">if</span> <span class="n">sp</span><span class="o">.</span><span class="n">returncode</span><span class="p">:</span> <span class="k">raise</span> <span class="n">AirflowException</span><span class="p">(</span><span class="s2">"Bash command failed"</span><span class="p">)</span> <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">xcom_push</span><span class="p">:</span> @@ -297,6 +324,9 @@ </pre></div> </div> + <div class="articleComments"> + + </div> </div> <footer> @@ -329,7 +359,8 @@ VERSION:'', COLLAPSE_INDEX:false, FILE_SUFFIX:'.html', - HAS_SOURCE: true + HAS_SOURCE: true, + SOURCELINK_SUFFIX: '.txt' }; </script> <script type="text/javascript" src="../_static/jquery.js"></script>