This is an automated email from the ASF dual-hosted git repository.
damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new bb310e0b87a Document compact lifecycle method (#29466)
bb310e0b87a is described below
commit bb310e0b87a2caa932ea11c1a8af301c0fca44f4
Author: Danny McCormick <[email protected]>
AuthorDate: Fri Nov 17 08:33:12 2023 -0800
Document compact lifecycle method (#29466)
* Update programming-guide.md
* Update python example
* Update go compact example
* Add note
* Wording/link
* Fix function name
---
sdks/go/examples/snippets/04transforms.go | 5 +++++
.../apache_beam/examples/snippets/snippets_test.py | 4 ++++
.../content/en/documentation/programming-guide.md | 23 ++++++++++++++++------
3 files changed, 26 insertions(+), 6 deletions(-)
diff --git a/sdks/go/examples/snippets/04transforms.go
b/sdks/go/examples/snippets/04transforms.go
index bb3abe3cf83..a9c28369198 100644
--- a/sdks/go/examples/snippets/04transforms.go
+++ b/sdks/go/examples/snippets/04transforms.go
@@ -332,6 +332,11 @@ func (fn *averageFn) ExtractOutput(a averageAccum) float64
{
return float64(a.Sum) / float64(a.Count)
}
+func (fn *averageFn) Compact(a averageAccum) averageAccum {
+ // No-op
+ return a
+}
+
func init() {
register.Combiner3[averageAccum, int, float64](&averageFn{})
}
diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py
b/sdks/python/apache_beam/examples/snippets/snippets_test.py
index ec52ca37af3..e8cb8960cf4 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets_test.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py
@@ -1261,6 +1261,10 @@ class CombineTest(unittest.TestCase):
(sum, count) = sum_count
return sum / count if count else float('NaN')
+ def compact(self, accumulator):
+ # No-op
+ return accumulator
+
# [END combine_custom_average_define]
# [START combine_custom_average_execute]
average = pc | beam.CombineGlobally(AverageFn())
diff --git a/website/www/site/content/en/documentation/programming-guide.md
b/website/www/site/content/en/documentation/programming-guide.md
index 564b01a7146..92163c12ab8 100644
--- a/website/www/site/content/en/documentation/programming-guide.md
+++ b/website/www/site/content/en/documentation/programming-guide.md
@@ -1598,10 +1598,12 @@ You should use a `CombineFn` if the combine function
requires a more sophisticat
accumulator, must perform additional pre- or post-processing, might change the
output type, or takes the key into account.
-A general combining operation consists of four operations. When you create a
+A general combining operation consists of five operations. When you create a
<span class="language-java language-py">subclass of</span>
-`CombineFn`, you must provide four operations by overriding the
-corresponding methods:
+`CombineFn`, you must provide five operations by overriding the
+corresponding methods. Only `MergeAccumulators` is a required method. The
+others will have a default interpretation based on the accumulator type. The
+lifecycle methods are:
1. **Create Accumulator** creates a new "local" accumulator. In the example
case, taking a mean average, a local accumulator tracks the running sum of
@@ -1623,6 +1625,14 @@ corresponding methods:
mean average, this means dividing the combined sum of all the values by the
number of values summed. It is called once on the final, merged accumulator.
+5. **Compact** returns a more compact represenation of the accumulator. This is
+ called before an accumulator is sent across the wire, and can be useful in
+ cases where values are buffered or otherwise lazily kept unprocessed when
+ added to the accumulator. Compact should return an equivalent, though
+ possibly modified, accumulator. In most cases, Compact is not necessary.
For
+ a real world example of using Compact, see the Python SDK implementation of
+
[TopCombineFn](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/combiners.py#L523)
+
The following example code shows how to define a `CombineFn` that computes a
mean average:
@@ -1657,6 +1667,10 @@ public class AverageFn extends CombineFn<Integer,
AverageFn.Accum, Double> {
public Double extractOutput(Accum accum) {
return ((double) accum.sum) / accum.count;
}
+
+ // No-op
+ @Override
+ public Accum compact(Accum accum) { return accum; }
}
{{< /highlight >}}
@@ -1675,9 +1689,6 @@ pc = ...
<span class="language-go">
-> **Note**: Only `MergeAccumulators` is a required method. The others will
have a default interpretation
-> based on the accumulator type.
-
</span>
##### 4.2.4.3. Combining a PCollection into a single value
{#combining-pcollection}