This is an automated email from the ASF dual-hosted git repository.

udim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 706e20f  [BEAM-9487] Disable allowing unsafe triggers by default
     new cf8e08f  Merge pull request #15340 from zhoufek/gbk_233
706e20f is described below

commit 706e20f7c3f0dafe834981e2dd2715082d38e548
Author: zhoufek <zhou...@google.com>
AuthorDate: Tue Aug 17 09:03:56 2021 -0400

    [BEAM-9487] Disable allowing unsafe triggers by default
---
 CHANGES.md                                         |  6 +++--
 .../python/apache_beam/options/pipeline_options.py |  3 +--
 sdks/python/apache_beam/transforms/core.py         | 29 ++++++++++------------
 3 files changed, 18 insertions(+), 20 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 6da170d..6fc25b8 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -63,7 +63,8 @@
 
 ## Breaking Changes
 
-* X behavior was changed 
([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
+* Python GBK by defualt will fail on unbounded PCollections that have global 
windowing and a default trigger. The `--allow_unsafe_triggers` flag can be used 
to override this. 
([BEAM-9487](https://issues.apache.org/jira/browse/BEAM-9487)).
+* Python GBK will fail if it detects an unsafe trigger unless the 
`--allow_unsafe_triggers` flag is set. 
([BEAM-9487](https://issues.apache.org/jira/browse/BEAM-9487)).
 
 ## Deprecations
 
@@ -123,7 +124,8 @@
 
 ## Deprecations
 
-* X behavior is deprecated and will be removed in X versions 
([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
+* Python GBK will stop supporting unbounded PCollections that have global 
windowing and a default trigger in Beam 2.33. This can be overriden with 
`--allow_unsafe_triggers`. 
([BEAM-9487](https://issues.apache.org/jira/browse/BEAM-9487)).
+* Python GBK will start requiring safe triggers or the 
`--allow_unsafe_triggers` flag starting with Beam 2.33. 
([BEAM-9487](https://issues.apache.org/jira/browse/BEAM-9487)).
 
 ## Known Issues
 
diff --git a/sdks/python/apache_beam/options/pipeline_options.py 
b/sdks/python/apache_beam/options/pipeline_options.py
index c597b4a..bba56ef 100644
--- a/sdks/python/apache_beam/options/pipeline_options.py
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -534,8 +534,7 @@ class TypeOptions(PipelineOptions):
         'compatibility. See BEAM-11719.')
     parser.add_argument(
         '--allow_unsafe_triggers',
-        # TODO(BEAM-9487): Set to False for Beam 2.33
-        default=True,
+        default=False,
         action='store_true',
         help='Allow the use of unsafe triggers. Unsafe triggers have the '
         'potential to cause data loss due to finishing and/or never having '
diff --git a/sdks/python/apache_beam/transforms/core.py 
b/sdks/python/apache_beam/transforms/core.py
index 93d29ed..25b05df 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -2320,10 +2320,10 @@ class GroupByKey(PTransform):
       if pcoll.pipeline.allow_unsafe_triggers:
         # TODO(BEAM-9487) Change comment for Beam 2.33
         _LOGGER.warning(
-            'PCollection passed to GroupByKey (label: %s) is unbounded, has a '
-            'global window, and uses a default trigger. This will no longer '
-            'be allowed starting with Beam 2.33 unless '
-            '--allow_unsafe_triggers is set.',
+            '%s: PCollection passed to GroupByKey is unbounded, has a global '
+            'window, and uses a default trigger. This is being allowed '
+            'because --allow_unsafe_triggers is set, but it may prevent '
+            'data from making it through the pipeline.',
             self.label)
       else:
         raise ValueError(
@@ -2332,22 +2332,19 @@ class GroupByKey(PTransform):
 
     unsafe_reason = trigger.may_lose_data(windowing)
     if unsafe_reason != DataLossReason.NO_POTENTIAL_LOSS:
+      reason_msg = str(unsafe_reason).replace('DataLossReason.', '')
       if pcoll.pipeline.allow_unsafe_triggers:
-        # TODO(BEAM-9487): Switch back to this log for Beam 2.33.
-        # _LOGGER.warning(
-        #   'Skipping trigger safety check. '
-        #   'This could lead to incomplete or missing groups.')
         _LOGGER.warning(
-            '%s: Unsafe trigger type (%s) detected. Starting with '
-            'Beam 2.33, this will raise an error by default. '
-            'Either change the pipeline to use a safe trigger or '
-            'set the --allow_unsafe_triggers flag.',
+            '%s: Unsafe trigger `%s` detected (reason: %s). This is '
+            'being allowed because --allow_unsafe_triggers is set. This could '
+            'lead to missing or incomplete groups.',
             self.label,
-            unsafe_reason)
+            trigger,
+            reason_msg)
       else:
-        msg = 'Unsafe trigger: `{}` may lose data. '.format(trigger)
-        msg += 'Reason: {}. '.format(
-            str(unsafe_reason).replace('DataLossReason.', ''))
+        msg = '{}: Unsafe trigger: `{}` may lose data. '.format(
+            self.label, trigger)
+        msg += 'Reason: {}. '.format(reason_msg)
         msg += 'This can be overriden with the --allow_unsafe_triggers flag.'
         raise ValueError(msg)
 

Reply via email to