This is an automated email from the ASF dual-hosted git repository.

johncasey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 3c93f7d17ba Feature/add warnings to file watchers (#28206)
3c93f7d17ba is described below

commit 3c93f7d17ba2f1b7e7324dbad441bdb241b1e724
Author: johnjcasey <95318300+johnjca...@users.noreply.github.com>
AuthorDate: Fri Sep 1 10:00:55 2023 -0400

    Feature/add warnings to file watchers (#28206)
    
    * Update 2.50 release notes to include new Kafka topicPattern feature
    
    * Add warning for using match continuously
    
    * add match continuously warning to python as well
    
    * fix lint errors
    
    * fix formatting again
---
 .../core/src/main/java/org/apache/beam/sdk/io/FileIO.java    |  9 +++++++++
 sdks/python/apache_beam/io/fileio.py                         | 12 ++++++++++++
 2 files changed, 21 insertions(+)

diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java
index 5f319706e9e..2d28279f90b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java
@@ -512,9 +512,18 @@ public class FileIO {
      * the watching frequency given by the {@code interval}. The pipeline will 
throw a {@code
      * RuntimeError} if timestamp extraction for the matched file has failed, 
suggesting the
      * timestamp metadata is not available with the IO connector.
+     *
+     * <p>Matching continuously scales poorly, as it is stateful, and requires 
storing file ids in
+     * memory. In addition, because it is memory-only, if a pipeline is 
restarted, already processed
+     * files will be reprocessed. Consider an alternate technique, such as <a
+     * 
href="https://cloud.google.com/storage/docs/pubsub-notifications";>Pub/Sub 
Notifications</a>
+     * when using GCS if possible.
      */
     public MatchConfiguration continuously(
         Duration interval, TerminationCondition<String, ?> condition, boolean 
matchUpdatedFiles) {
+      LOG.warn(
+          "Matching Continuously is stateful, and can scale poorly. Consider 
using Pub/Sub "
+              + "Notifications 
(https://cloud.google.com/storage/docs/pubsub-notifications) if possible");
       return toBuilder()
           .setWatchInterval(interval)
           .setWatchTerminationCondition(condition)
diff --git a/sdks/python/apache_beam/io/fileio.py 
b/sdks/python/apache_beam/io/fileio.py
index a9297ca3d7a..23e979b44ca 100644
--- a/sdks/python/apache_beam/io/fileio.py
+++ b/sdks/python/apache_beam/io/fileio.py
@@ -266,6 +266,13 @@ class MatchContinuously(beam.PTransform):
 
   MatchContinuously is experimental.  No backwards-compatibility
   guarantees.
+
+  Matching continuously scales poorly, as it is stateful, and requires storing
+  file ids in memory. In addition, because it is memory-only, if a pipeline is
+  restarted, already processed files will be reprocessed. Consider an alternate
+  technique, such as Pub/Sub Notifications
+  (https://cloud.google.com/storage/docs/pubsub-notifications)
+  when using GCS if possible.
   """
   def __init__(
       self,
@@ -299,6 +306,11 @@ class MatchContinuously(beam.PTransform):
     self.match_upd = match_updated_files
     self.apply_windowing = apply_windowing
     self.empty_match_treatment = empty_match_treatment
+    _LOGGER.warning(
+        'Matching Continuously is stateful, and can scale poorly. '
+        'Consider using Pub/Sub Notifications '
+        '(https://cloud.google.com/storage/docs/pubsub-notifications) '
+        'if possible')
 
   def expand(self, pbegin) -> beam.PCollection[filesystem.FileMetadata]:
     # invoke periodic impulse

Reply via email to