vincbeck commented on code in PR #22737:
URL: https://github.com/apache/airflow/pull/22737#discussion_r844082410


##########
airflow/providers/amazon/aws/sensors/s3.py:
##########
@@ -78,27 +80,32 @@ def __init__(
     ):
         super().__init__(**kwargs)
         self.bucket_name = bucket_name
-        self.bucket_key = bucket_key
+        self.bucket_key = [bucket_key] if isinstance(bucket_key, str) else 
bucket_key
         self.wildcard_match = wildcard_match
         self.aws_conn_id = aws_conn_id
         self.verify = verify
         self.hook: Optional[S3Hook] = None
 
-    def _resolve_bucket_and_key(self):
+    def _resolve_bucket_and_key(self, key):
         """If key is URI, parse bucket"""
         if self.bucket_name is None:
-            self.bucket_name, self.bucket_key = 
S3Hook.parse_s3_url(self.bucket_key)
+            return S3Hook.parse_s3_url(key)
         else:
-            parsed_url = urlparse(self.bucket_key)
+            parsed_url = urlparse(key)
             if parsed_url.scheme != '' or parsed_url.netloc != '':
                 raise AirflowException('If bucket_name provided, bucket_key 
must be relative path, not URI.')
+            return self.bucket_name, key
 
-    def poke(self, context: 'Context'):
-        self._resolve_bucket_and_key()
-        self.log.info('Poking for key : s3://%s/%s', self.bucket_name, 
self.bucket_key)
+    def _key_exists(self, key):
+        bucket_name, key = self._resolve_bucket_and_key(key)
+        self.log.info('Poking for key : s3://%s/%s', bucket_name, key)
         if self.wildcard_match:
-            return self.get_hook().check_for_wildcard_key(self.bucket_key, 
self.bucket_name)
-        return self.get_hook().check_for_key(self.bucket_key, self.bucket_name)
+            return self.get_hook().check_for_wildcard_key(key, bucket_name)
+
+        return self.get_hook().check_for_key(key, bucket_name)
+
+    def poke(self, context: 'Context'):
+        return all(self._key_exists(key) for key in self.bucket_key)

Review Comment:
   > Also if I poke for bucket_key=['a','b'] and 'a' is present but 'b' is not 
why should I keep poke for a?
   
   Bouncing back on this question. Actually, having file 'a' present at time N, 
it does not mean file 'a' is still present at time N+1. So I dont think we can 
really optimize here. Example: we are waiting for file 'a' and 'b' in a same 
bucket.
   - File 'a' gets created
   - File 'a' gets removed
   - File 'b' gets created
   
   In this situation, the sensor should never stop waiting. If we go down your 
optimization path, the sensor will act as though the 2 files are present



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to