This is an automated email from the ASF dual-hosted git repository.

udim pushed a commit to branch release-2.33.0
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/release-2.33.0 by this push:
     new 1a7271d  [BEAM-9487] Revert PR15340 for Beam 2.33
     new 3b74c92  Merge pull request #15609 from zhoufek/reversion
1a7271d is described below

commit 1a7271d99efb2ca9718d0e894d7db5e8b8c4d486
Author: zhoufek <zhou...@google.com>
AuthorDate: Tue Sep 28 14:28:19 2021 -0400

    [BEAM-9487] Revert PR15340 for Beam 2.33
---
 CHANGES.md                                         |  5 ++--
 .../python/apache_beam/options/pipeline_options.py |  3 ++-
 sdks/python/apache_beam/transforms/core.py         | 31 ++++++++++++----------
 3 files changed, 21 insertions(+), 18 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index f2076b8..089ee9a 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -73,8 +73,6 @@
 
 ## Breaking Changes
 
-* Python GBK by defualt will fail on unbounded PCollections that have global 
windowing and a default trigger. The `--allow_unsafe_triggers` flag can be used 
to override this. 
([BEAM-9487](https://issues.apache.org/jira/browse/BEAM-9487)).
-* Python GBK will fail if it detects an unsafe trigger unless the 
`--allow_unsafe_triggers` flag is set. 
([BEAM-9487](https://issues.apache.org/jira/browse/BEAM-9487)).
 * Go SDK pipelines require new import paths to use this release due to 
migration to Go Modules.
   * `go.mod` files will need to change to require 
`github.com/apache/beam/sdks/v2`.
   * Code depending on beam imports need to include v2 on the module path.
@@ -83,7 +81,8 @@
 
 ## Deprecations
 
-* X behavior is deprecated and will be removed in X versions 
([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
+* Python GBK will stop supporting unbounded PCollections that have global 
windowing and a default trigger in Beam 2.34. This can be overriden with 
`--allow_unsafe_triggers`. 
([BEAM-9487](https://issues.apache.org/jira/browse/BEAM-9487)).
+* Python GBK will start requiring safe triggers or the 
`--allow_unsafe_triggers` flag starting with Beam 2.34. 
([BEAM-9487](https://issues.apache.org/jira/browse/BEAM-9487)).
 
 ## Known Issues
 
diff --git a/sdks/python/apache_beam/options/pipeline_options.py 
b/sdks/python/apache_beam/options/pipeline_options.py
index bba56ef..622c9f0 100644
--- a/sdks/python/apache_beam/options/pipeline_options.py
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -534,7 +534,8 @@ class TypeOptions(PipelineOptions):
         'compatibility. See BEAM-11719.')
     parser.add_argument(
         '--allow_unsafe_triggers',
-        default=False,
+        # TODO(BEAM-9487): Set to False for Beam 2.34
+        default=True,
         action='store_true',
         help='Allow the use of unsafe triggers. Unsafe triggers have the '
         'potential to cause data loss due to finishing and/or never having '
diff --git a/sdks/python/apache_beam/transforms/core.py 
b/sdks/python/apache_beam/transforms/core.py
index 25b05df..d5a1abd 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -2318,12 +2318,12 @@ class GroupByKey(PTransform):
         windowing.windowfn, GlobalWindows) and isinstance(trigger,
                                                           DefaultTrigger):
       if pcoll.pipeline.allow_unsafe_triggers:
-        # TODO(BEAM-9487) Change comment for Beam 2.33
+        # TODO(BEAM-9487) Change comment for Beam 2.34
         _LOGGER.warning(
-            '%s: PCollection passed to GroupByKey is unbounded, has a global '
-            'window, and uses a default trigger. This is being allowed '
-            'because --allow_unsafe_triggers is set, but it may prevent '
-            'data from making it through the pipeline.',
+            'PCollection passed to GroupByKey (label: %s) is unbounded, has a '
+            'global window, and uses a default trigger. This will no longer '
+            'be allowed starting with Beam 2.34 unless '
+            '--allow_unsafe_triggers is set.',
             self.label)
       else:
         raise ValueError(
@@ -2332,19 +2332,22 @@ class GroupByKey(PTransform):
 
     unsafe_reason = trigger.may_lose_data(windowing)
     if unsafe_reason != DataLossReason.NO_POTENTIAL_LOSS:
-      reason_msg = str(unsafe_reason).replace('DataLossReason.', '')
       if pcoll.pipeline.allow_unsafe_triggers:
+        # TODO(BEAM-9487): Switch back to this log for Beam 2.34.
+        # _LOGGER.warning(
+        #   'Skipping trigger safety check. '
+        #   'This could lead to incomplete or missing groups.')
         _LOGGER.warning(
-            '%s: Unsafe trigger `%s` detected (reason: %s). This is '
-            'being allowed because --allow_unsafe_triggers is set. This could '
-            'lead to missing or incomplete groups.',
+            '%s: Unsafe trigger type (%s) detected. Starting with '
+            'Beam 2.34, this will raise an error by default. '
+            'Either change the pipeline to use a safe trigger or '
+            'set the --allow_unsafe_triggers flag.',
             self.label,
-            trigger,
-            reason_msg)
+            unsafe_reason)
       else:
-        msg = '{}: Unsafe trigger: `{}` may lose data. '.format(
-            self.label, trigger)
-        msg += 'Reason: {}. '.format(reason_msg)
+        msg = 'Unsafe trigger: `{}` may lose data. '.format(trigger)
+        msg += 'Reason: {}. '.format(
+            str(unsafe_reason).replace('DataLossReason.', ''))
         msg += 'This can be overriden with the --allow_unsafe_triggers flag.'
         raise ValueError(msg)
 

Reply via email to