florian-trehaut opened a new pull request, #38220:
URL: https://github.com/apache/beam/pull/38220

   Addresses #19868 — adds the `GroupIntoBatches` and 
`GroupIntoBatchesWithShardedKey` transforms to the Go SDK, bringing feature 
parity with the Java and Python SDKs.
   
   ## Summary
   
   * **`transforms/batch.GroupIntoBatches(s, params, col)`** — stateful DoFn 
that buffers values per key and emits `PCollection<KV<K, []V>>` whenever 
`BatchSize`, `BatchSizeBytes`, `MaxBufferingDuration`, or end-of-window + 
allowed lateness triggers fire. Arbitrary `K`/`V` types supported via `typex` 
universals resolved at graph construction.
   * **`transforms/batch.GroupIntoBatchesWithShardedKey(s, params, col)`** — 
shard-qualified variant that spreads a single hot logical key's processing 
across workers. Output shape `KV<K, []V>` identical to `GroupIntoBatches` (see 
godoc for why Go does not surface `ShardedKey<K>` downstream like Java/Python).
   * **Core SDK additions**:
     * `*coder.Coder.IsDeterministic()` + `coder.RegisterDeterministicCoder(t, 
enc, dec)` — opt-in deterministic registration for user custom coders.
     * `beam.Coder.IsDeterministic()` and 
`beam.PCollection.WindowingStrategy()` — public accessors previously limited to 
the `beam` package internals.
     * `typex.ShardedKey` composite type + `beam:coder:sharded_key:v1` URN 
wired in `graphx` marshal/unmarshal and `exec` encode/decode. **Verified 
byte-identical against the 4 published `standard_coders.yaml:506-521` 
fixtures**, so Go-emitted SK bytes round-trip cross-SDK on Dataflow/Flink.
   
   ## Public API
   
   ```go
   import "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/batch"
   
   // count or byte threshold
   batched := batch.GroupIntoBatches(s, batch.Params{BatchSize: 100}, kvs)
   // with buffering timeout for streaming
   batched := batch.GroupIntoBatches(s, batch.Params{BatchSize: 100, 
MaxBufferingDuration: 5 * time.Second}, kvs)
   // with sharding for hot keys on distributed runners
   batched := batch.GroupIntoBatchesWithShardedKey(s, batch.Params{BatchSize: 
100}, kvs)
   ```
   
   ## Test plan
   
   All tests pass on `prism`:
   
   * `TestCoder_IsDeterministic` (20+ cases, composition: KV, iterable, CoGBK, 
nullable, windowed, custom opt-in, row)
   * `TestNewSK`, `TestSK_IsDeterministic`
   * `TestShardedKeyCoder_WireFormat` — all 4 `standard_coders.yaml` fixtures 
byte-identical
   * `TestGroupIntoBatches_CountLimit` — 1000 inputs, 10 keys, batch 100
   * `TestGroupIntoBatches_ByteLimit`
   * `TestGroupIntoBatches_PerKey`
   * `TestGroupIntoBatches_IntValues` — non-string value type
   * `TestGroupIntoBatchesWithShardedKey_Construction`
   * `TestParams_validate`, `TestDefaultElementByteSize`, 
`TestIsBuiltinSizeable`
   
   ## Design notes for reviewers
   
   Open API decisions (marked P1-P7 in the issue-linked spec) where committer 
pushback welcome:
   
   * **Params struct vs. functional options** — went with a struct for explicit 
validation (`Params.validate()`); aligns with Python `_GroupIntoBatchesParams`.
   * **`int64` for `BatchSize` / `BatchSizeBytes`** — proto-parity, avoids 
32-bit overflow.
   * **Package path `transforms/batch`** — short, extensible 
(`batch.GroupIntoBatches`, `batch.GroupIntoBatchesWithShardedKey`).
   * **`ShardedKey` as composite marker (`typex.KV` shape)** — valid wire 
format, but consumer-facing struct representation would require changing Go SDK 
type-binding to accept generic structs as DoFn outputs. Happy to coordinate on 
that follow-up.
   * **Processing-time + event-time timer DoFns split by shape** — Prism stalls 
if an unused timer family is declared on the DoFn struct; splitting into 
`groupIntoBatchesFn` and `groupIntoBatchesBufferedFn` avoids this without 
needing a Prism-side change. If an SDK-wide fix lands, the split collapses 
trivially.
   * **BatchSizeBytes limited to primitive value types** — user-provided 
`ElementByteSize` funcs cannot serialise cross-worker with the current 
registration infrastructure. Covers the common sink use case (`[]byte` / 
`string`) without regressing correctness. Extensible via 
`beam.RegisterFunction` if needed.
   
   ## Known follow-ups
   
   * `TestGroupIntoBatchesWithShardedKey_Construction` validates pipeline 
construction but does not execute because Prism panics inside 
`aggregateStageKind.buildEventTimeBundle` with "assignment to entry in nil map" 
on the 3-stage round-trip — this is a Prism regression independent of the 
transform (happy to file & triage separately).
   * Cross-SDK `ShardedKey` bidirectional pipelines (e.g., Java → Go) pending 
`register.RegisterShardedKeyType[K]` API maturation.
   
   ## Files changed
   
   New:
   * `sdks/go/pkg/beam/transforms/batch/batch.go`
   * `sdks/go/pkg/beam/transforms/batch/batch_prism_test.go`
   * `sdks/go/pkg/beam/transforms/batch/batch_test.go`
   * `sdks/go/pkg/beam/transforms/batch/doc.go`
   * `sdks/go/pkg/beam/transforms/batch/size.go`
   * `sdks/go/pkg/beam/transforms/batch/size_test.go`
   * `sdks/go/pkg/beam/core/graph/coder/sharded_key_test.go`
   
   Modified:
   * `CHANGES.md` under `[2.74.0] - Unreleased`
   * `sdks/go/pkg/beam/coder.go`, `pcollection.go` — public accessors
   * `sdks/go/pkg/beam/core/graph/coder/coder.go`, `registry.go` — 
IsDeterministic + NewSK + RegisterDeterministicCoder
   * `sdks/go/pkg/beam/core/runtime/exec/coder.go` — ShardedKey encode/decode
   * `sdks/go/pkg/beam/core/runtime/graphx/coder.go` — URN wiring
   * `sdks/go/pkg/beam/core/typex/special.go`, `class.go`, `fulltype.go` — 
ShardedKey composite
   
   ---
   
   Happy to iterate on any committer feedback.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to