This is an automated email from the ASF dual-hosted git repository.

damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new bb310e0b87a Document compact lifecycle method (#29466)
bb310e0b87a is described below

commit bb310e0b87a2caa932ea11c1a8af301c0fca44f4
Author: Danny McCormick <[email protected]>
AuthorDate: Fri Nov 17 08:33:12 2023 -0800

    Document compact lifecycle method (#29466)
    
    * Update programming-guide.md
    
    * Update python example
    
    * Update go compact example
    
    * Add note
    
    * Wording/link
    
    * Fix function name
---
 sdks/go/examples/snippets/04transforms.go          |  5 +++++
 .../apache_beam/examples/snippets/snippets_test.py |  4 ++++
 .../content/en/documentation/programming-guide.md  | 23 ++++++++++++++++------
 3 files changed, 26 insertions(+), 6 deletions(-)

diff --git a/sdks/go/examples/snippets/04transforms.go 
b/sdks/go/examples/snippets/04transforms.go
index bb3abe3cf83..a9c28369198 100644
--- a/sdks/go/examples/snippets/04transforms.go
+++ b/sdks/go/examples/snippets/04transforms.go
@@ -332,6 +332,11 @@ func (fn *averageFn) ExtractOutput(a averageAccum) float64 
{
        return float64(a.Sum) / float64(a.Count)
 }
 
+func (fn *averageFn) Compact(a averageAccum) averageAccum {
+       // No-op
+       return a
+}
+
 func init() {
        register.Combiner3[averageAccum, int, float64](&averageFn{})
 }
diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py 
b/sdks/python/apache_beam/examples/snippets/snippets_test.py
index ec52ca37af3..e8cb8960cf4 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets_test.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py
@@ -1261,6 +1261,10 @@ class CombineTest(unittest.TestCase):
         (sum, count) = sum_count
         return sum / count if count else float('NaN')
 
+      def compact(self, accumulator):
+        # No-op
+        return accumulator
+
     # [END combine_custom_average_define]
     # [START combine_custom_average_execute]
     average = pc | beam.CombineGlobally(AverageFn())
diff --git a/website/www/site/content/en/documentation/programming-guide.md 
b/website/www/site/content/en/documentation/programming-guide.md
index 564b01a7146..92163c12ab8 100644
--- a/website/www/site/content/en/documentation/programming-guide.md
+++ b/website/www/site/content/en/documentation/programming-guide.md
@@ -1598,10 +1598,12 @@ You should use a `CombineFn` if the combine function 
requires a more sophisticat
 accumulator, must perform additional pre- or post-processing, might change the
 output type, or takes the key into account.
 
-A general combining operation consists of four operations. When you create a
+A general combining operation consists of five operations. When you create a
 <span class="language-java language-py">subclass of</span>
-`CombineFn`, you must provide four operations by overriding the
-corresponding methods:
+`CombineFn`, you must provide five operations by overriding the
+corresponding methods. Only `MergeAccumulators` is a required method. The
+others will have a default interpretation based on the accumulator type. The
+lifecycle methods are:
 
 1. **Create Accumulator** creates a new "local" accumulator. In the example
    case, taking a mean average, a local accumulator tracks the running sum of
@@ -1623,6 +1625,14 @@ corresponding methods:
    mean average, this means dividing the combined sum of all the values by the
    number of values summed. It is called once on the final, merged accumulator.
 
+5. **Compact** returns a more compact represenation of the accumulator. This is
+    called before an accumulator is sent across the wire, and can be useful in
+    cases where values are buffered or otherwise lazily kept unprocessed when
+    added to the accumulator.  Compact should return an equivalent, though
+    possibly modified, accumulator. In most cases, Compact is not necessary. 
For
+    a real world example of using Compact, see the Python SDK implementation of
+    
[TopCombineFn](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/combiners.py#L523)
+
 The following example code shows how to define a `CombineFn` that computes a
 mean average:
 
@@ -1657,6 +1667,10 @@ public class AverageFn extends CombineFn<Integer, 
AverageFn.Accum, Double> {
   public Double extractOutput(Accum accum) {
     return ((double) accum.sum) / accum.count;
   }
+
+  // No-op
+  @Override
+  public Accum compact(Accum accum) { return accum; }
 }
 {{< /highlight >}}
 
@@ -1675,9 +1689,6 @@ pc = ...
 
 <span class="language-go">
 
-> **Note**: Only `MergeAccumulators` is a required method. The others will 
have a default interpretation
-> based on the accumulator type.
-
 </span>
 
 ##### 4.2.4.3. Combining a PCollection into a single value 
{#combining-pcollection}

Reply via email to