ashb commented on a change in pull request #2450: [Airflow-1413] Fix FTPSensor 
failing on error message with unexpected text.
URL: https://github.com/apache/incubator-airflow/pull/2450#discussion_r240663995
 
 

 ##########
 File path: airflow/contrib/sensors/ftp_sensor.py
 ##########
 @@ -26,33 +28,64 @@
 class FTPSensor(BaseSensorOperator):
     """
     Waits for a file or directory to be present on FTP.
-
-    :param path: Remote file or directory path
-    :type path: str
-    :param ftp_conn_id: The connection to run the sensor against
-    :type ftp_conn_id: str
     """
+
     template_fields = ('path',)
 
+    """Errors that are transient in nature, and where action can be retried"""
+    transient_errors = [421, 425, 426, 434, 450, 451, 452]
+
+    error_code_pattern = re.compile("([\d]+)")
+
     @apply_defaults
-    def __init__(self, path, ftp_conn_id='ftp_default', *args, **kwargs):
+    def __init__(
+            self,
+            path,
+            ftp_conn_id='ftp_default',
+            fail_on_transient_errors=True,
+            *args,
+            **kwargs):
+        """
+        Create a new FTP sensor
+        :param path: Remote file or directory path
+        :type path: str
+        :param fail_on_transient_errors: Fail on all errors,
+            including 4xx transient errors. Default True.
+        :type fail_on_transient_errors: bool
+        :param ftp_conn_id: The connection to run the sensor against
+        :type ftp_conn_id: str
+        """
+
         super(FTPSensor, self).__init__(*args, **kwargs)
 
         self.path = path
         self.ftp_conn_id = ftp_conn_id
+        self.fail_on_transient_errors = fail_on_transient_errors
 
     def _create_hook(self):
         """Return connection hook."""
         return FTPHook(ftp_conn_id=self.ftp_conn_id)
 
+    def _get_error_code(self, e):
+        """Extract error code from ftp exception"""
+        try:
+            matches = self.error_code_pattern.match(str(e))
+            code = int(matches.group(0))
+            return code
+        except ValueError:
+            return None
+
     def poke(self, context):
         with self._create_hook() as hook:
             self.log.info('Poking for %s', self.path)
             try:
                 hook.get_mod_time(self.path)
             except ftplib.error_perm as e:
-                error = str(e).split(None, 1)
-                if error[1] != "Can't check for file existence":
+                logging.info('Ftp error encountered: %s', str(e))
 
 Review comment:
   ```suggestion
                   self.log.info('Ftp error encountered: %s', str(e))
   ```
   
   (And remove the import of `logging` at the top)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to