eladkal commented on code in PR #22737: URL: https://github.com/apache/airflow/pull/22737#discussion_r843496142
########## airflow/providers/amazon/aws/sensors/s3.py: ########## @@ -40,15 +41,16 @@ class S3KeySensor(BaseSensorOperator): """ - Waits for a key (a file-like instance on S3) to be present in a S3 bucket. + Waits for one or multiple keys (a file-like instance on S3) to be present in a S3 bucket. Review Comment: can you also make this edit in `S3KeySizeSensor` ? ########## airflow/providers/amazon/aws/sensors/s3.py: ########## @@ -78,27 +80,32 @@ def __init__( ): super().__init__(**kwargs) self.bucket_name = bucket_name - self.bucket_key = bucket_key + self.bucket_key = [bucket_key] if isinstance(bucket_key, str) else bucket_key self.wildcard_match = wildcard_match self.aws_conn_id = aws_conn_id self.verify = verify self.hook: Optional[S3Hook] = None - def _resolve_bucket_and_key(self): + def _resolve_bucket_and_key(self, key): """If key is URI, parse bucket""" if self.bucket_name is None: - self.bucket_name, self.bucket_key = S3Hook.parse_s3_url(self.bucket_key) + return S3Hook.parse_s3_url(key) else: - parsed_url = urlparse(self.bucket_key) + parsed_url = urlparse(key) if parsed_url.scheme != '' or parsed_url.netloc != '': raise AirflowException('If bucket_name provided, bucket_key must be relative path, not URI.') + return self.bucket_name, key - def poke(self, context: 'Context'): - self._resolve_bucket_and_key() - self.log.info('Poking for key : s3://%s/%s', self.bucket_name, self.bucket_key) + def _key_exists(self, key): + bucket_name, key = self._resolve_bucket_and_key(key) + self.log.info('Poking for key : s3://%s/%s', bucket_name, key) if self.wildcard_match: - return self.get_hook().check_for_wildcard_key(self.bucket_key, self.bucket_name) - return self.get_hook().check_for_key(self.bucket_key, self.bucket_name) + return self.get_hook().check_for_wildcard_key(key, bucket_name) + + return self.get_hook().check_for_key(key, bucket_name) + + def poke(self, context: 'Context'): + return all(self._key_exists(key) for key in self.bucket_key) Review Comment: What will happen if I check for 300 keys? If I get it right this will make 300 API calls per poke? If so I think it's not good. Also if I poke for `bucket_key=['a','b']` and 'a' is present but 'b' is not why should I keep poke for `a`? ########## airflow/providers/amazon/aws/sensors/s3.py: ########## @@ -166,13 +173,13 @@ def poke(self, context: 'Context'): def get_files(self, s3_hook: S3Hook, delimiter: Optional[str] = '/') -> List: """Gets a list of files in the bucket""" - prefix = self.bucket_key + prefix = self.bucket_key[0] Review Comment: can you explain this change? ########## tests/providers/amazon/aws/sensors/test_s3_prefix.py: ########## @@ -1,63 +0,0 @@ -# Review Comment: I'm not sure if we can remove this test file just yet. This is because the `S3PrefixSensor` while deprecated still have logic that converts the user given parameter to what `S3KeySensor` expect thus we do want to make sure all of the tests in this file are working since we are doing it in a backward compatible way. I would leave this test file as is + add a new test to this file that make sure deprecation warning is raised when using `S3PrefixSensor`. This test file can be removed when we remove `S3PrefixSensor` in the next Major release of the provider. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org