florian-trehaut opened a new pull request, #38220:
URL: https://github.com/apache/beam/pull/38220
Addresses #19868 — adds the `GroupIntoBatches` and
`GroupIntoBatchesWithShardedKey` transforms to the Go SDK, bringing feature
parity with the Java and Python SDKs.
## Summary
* **`transforms/batch.GroupIntoBatches(s, params, col)`** — stateful DoFn
that buffers values per key and emits `PCollection<KV<K, []V>>` whenever
`BatchSize`, `BatchSizeBytes`, `MaxBufferingDuration`, or end-of-window +
allowed lateness triggers fire. Arbitrary `K`/`V` types supported via `typex`
universals resolved at graph construction.
* **`transforms/batch.GroupIntoBatchesWithShardedKey(s, params, col)`** —
shard-qualified variant that spreads a single hot logical key's processing
across workers. Output shape `KV<K, []V>` identical to `GroupIntoBatches` (see
godoc for why Go does not surface `ShardedKey<K>` downstream like Java/Python).
* **Core SDK additions**:
* `*coder.Coder.IsDeterministic()` + `coder.RegisterDeterministicCoder(t,
enc, dec)` — opt-in deterministic registration for user custom coders.
* `beam.Coder.IsDeterministic()` and
`beam.PCollection.WindowingStrategy()` — public accessors previously limited to
the `beam` package internals.
* `typex.ShardedKey` composite type + `beam:coder:sharded_key:v1` URN
wired in `graphx` marshal/unmarshal and `exec` encode/decode. **Verified
byte-identical against the 4 published `standard_coders.yaml:506-521`
fixtures**, so Go-emitted SK bytes round-trip cross-SDK on Dataflow/Flink.
## Public API
```go
import "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/batch"
// count or byte threshold
batched := batch.GroupIntoBatches(s, batch.Params{BatchSize: 100}, kvs)
// with buffering timeout for streaming
batched := batch.GroupIntoBatches(s, batch.Params{BatchSize: 100,
MaxBufferingDuration: 5 * time.Second}, kvs)
// with sharding for hot keys on distributed runners
batched := batch.GroupIntoBatchesWithShardedKey(s, batch.Params{BatchSize:
100}, kvs)
```
## Test plan
All tests pass on `prism`:
* `TestCoder_IsDeterministic` (20+ cases, composition: KV, iterable, CoGBK,
nullable, windowed, custom opt-in, row)
* `TestNewSK`, `TestSK_IsDeterministic`
* `TestShardedKeyCoder_WireFormat` — all 4 `standard_coders.yaml` fixtures
byte-identical
* `TestGroupIntoBatches_CountLimit` — 1000 inputs, 10 keys, batch 100
* `TestGroupIntoBatches_ByteLimit`
* `TestGroupIntoBatches_PerKey`
* `TestGroupIntoBatches_IntValues` — non-string value type
* `TestGroupIntoBatchesWithShardedKey_Construction`
* `TestParams_validate`, `TestDefaultElementByteSize`,
`TestIsBuiltinSizeable`
## Design notes for reviewers
Open API decisions (marked P1-P7 in the issue-linked spec) where committer
pushback welcome:
* **Params struct vs. functional options** — went with a struct for explicit
validation (`Params.validate()`); aligns with Python `_GroupIntoBatchesParams`.
* **`int64` for `BatchSize` / `BatchSizeBytes`** — proto-parity, avoids
32-bit overflow.
* **Package path `transforms/batch`** — short, extensible
(`batch.GroupIntoBatches`, `batch.GroupIntoBatchesWithShardedKey`).
* **`ShardedKey` as composite marker (`typex.KV` shape)** — valid wire
format, but consumer-facing struct representation would require changing Go SDK
type-binding to accept generic structs as DoFn outputs. Happy to coordinate on
that follow-up.
* **Processing-time + event-time timer DoFns split by shape** — Prism stalls
if an unused timer family is declared on the DoFn struct; splitting into
`groupIntoBatchesFn` and `groupIntoBatchesBufferedFn` avoids this without
needing a Prism-side change. If an SDK-wide fix lands, the split collapses
trivially.
* **BatchSizeBytes limited to primitive value types** — user-provided
`ElementByteSize` funcs cannot serialise cross-worker with the current
registration infrastructure. Covers the common sink use case (`[]byte` /
`string`) without regressing correctness. Extensible via
`beam.RegisterFunction` if needed.
## Known follow-ups
* `TestGroupIntoBatchesWithShardedKey_Construction` validates pipeline
construction but does not execute because Prism panics inside
`aggregateStageKind.buildEventTimeBundle` with "assignment to entry in nil map"
on the 3-stage round-trip — this is a Prism regression independent of the
transform (happy to file & triage separately).
* Cross-SDK `ShardedKey` bidirectional pipelines (e.g., Java → Go) pending
`register.RegisterShardedKeyType[K]` API maturation.
## Files changed
New:
* `sdks/go/pkg/beam/transforms/batch/batch.go`
* `sdks/go/pkg/beam/transforms/batch/batch_prism_test.go`
* `sdks/go/pkg/beam/transforms/batch/batch_test.go`
* `sdks/go/pkg/beam/transforms/batch/doc.go`
* `sdks/go/pkg/beam/transforms/batch/size.go`
* `sdks/go/pkg/beam/transforms/batch/size_test.go`
* `sdks/go/pkg/beam/core/graph/coder/sharded_key_test.go`
Modified:
* `CHANGES.md` under `[2.74.0] - Unreleased`
* `sdks/go/pkg/beam/coder.go`, `pcollection.go` — public accessors
* `sdks/go/pkg/beam/core/graph/coder/coder.go`, `registry.go` —
IsDeterministic + NewSK + RegisterDeterministicCoder
* `sdks/go/pkg/beam/core/runtime/exec/coder.go` — ShardedKey encode/decode
* `sdks/go/pkg/beam/core/runtime/graphx/coder.go` — URN wiring
* `sdks/go/pkg/beam/core/typex/special.go`, `class.go`, `fulltype.go` —
ShardedKey composite
---
Happy to iterate on any committer feedback.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]