This is an automated email from the ASF dual-hosted git repository. johncasey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 3c93f7d17ba Feature/add warnings to file watchers (#28206) 3c93f7d17ba is described below commit 3c93f7d17ba2f1b7e7324dbad441bdb241b1e724 Author: johnjcasey <95318300+johnjca...@users.noreply.github.com> AuthorDate: Fri Sep 1 10:00:55 2023 -0400 Feature/add warnings to file watchers (#28206) * Update 2.50 release notes to include new Kafka topicPattern feature * Add warning for using match continuously * add match continuously warning to python as well * fix lint errors * fix formatting again --- .../core/src/main/java/org/apache/beam/sdk/io/FileIO.java | 9 +++++++++ sdks/python/apache_beam/io/fileio.py | 12 ++++++++++++ 2 files changed, 21 insertions(+) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java index 5f319706e9e..2d28279f90b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java @@ -512,9 +512,18 @@ public class FileIO { * the watching frequency given by the {@code interval}. The pipeline will throw a {@code * RuntimeError} if timestamp extraction for the matched file has failed, suggesting the * timestamp metadata is not available with the IO connector. + * + * <p>Matching continuously scales poorly, as it is stateful, and requires storing file ids in + * memory. In addition, because it is memory-only, if a pipeline is restarted, already processed + * files will be reprocessed. Consider an alternate technique, such as <a + * href="https://cloud.google.com/storage/docs/pubsub-notifications">Pub/Sub Notifications</a> + * when using GCS if possible. */ public MatchConfiguration continuously( Duration interval, TerminationCondition<String, ?> condition, boolean matchUpdatedFiles) { + LOG.warn( + "Matching Continuously is stateful, and can scale poorly. Consider using Pub/Sub " + + "Notifications (https://cloud.google.com/storage/docs/pubsub-notifications) if possible"); return toBuilder() .setWatchInterval(interval) .setWatchTerminationCondition(condition) diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py index a9297ca3d7a..23e979b44ca 100644 --- a/sdks/python/apache_beam/io/fileio.py +++ b/sdks/python/apache_beam/io/fileio.py @@ -266,6 +266,13 @@ class MatchContinuously(beam.PTransform): MatchContinuously is experimental. No backwards-compatibility guarantees. + + Matching continuously scales poorly, as it is stateful, and requires storing + file ids in memory. In addition, because it is memory-only, if a pipeline is + restarted, already processed files will be reprocessed. Consider an alternate + technique, such as Pub/Sub Notifications + (https://cloud.google.com/storage/docs/pubsub-notifications) + when using GCS if possible. """ def __init__( self, @@ -299,6 +306,11 @@ class MatchContinuously(beam.PTransform): self.match_upd = match_updated_files self.apply_windowing = apply_windowing self.empty_match_treatment = empty_match_treatment + _LOGGER.warning( + 'Matching Continuously is stateful, and can scale poorly. ' + 'Consider using Pub/Sub Notifications ' + '(https://cloud.google.com/storage/docs/pubsub-notifications) ' + 'if possible') def expand(self, pbegin) -> beam.PCollection[filesystem.FileMetadata]: # invoke periodic impulse