sjvanrossum commented on PR #31608: URL: https://github.com/apache/beam/pull/31608#issuecomment-2176125319
> The DataflowRunner overrides the pubsub write transform using org.apache.beam.runners.dataflow.DataflowRunner.StreamingPubsubIOWrite so org.apache.beam.runners.dataflow.worker.PubsubSink is used. It would be nice to prevent using the ordering key for now with the DataflowRunner unless the experiment to use the beam implementation is present. Agreed, I'll throw an exception if the runner is `DataflowRunner` and the `enable_custom_pubsub_sink` experiment is not present during `PubsubIO.Write#expand()` with details and instructions to resolve the issue. > To add support for it to Dataflow, it appears that if PUBSUB_SERIALIZED_ATTRIBUTES_FN is set, that maps bytes to PubsubMessage which already includes the ordering key. But for the ordering key to be respected for publishing, additional changes would be needed in the dataflow service backend. Currently it looks like it would just be dropped but if it was respected the service would also need to be updated to ensure batching doesn't occur across ordering keys. Agreed, I'll create a new bug for this to continue this discussion internally. > > User configuration of the number of output shards or the use of a single output shard for messages with ordering keys (due to 1 MBps throughput limit per ordering key) is an open topic. > > Are you considering producing to a single ordering key from multiple distinct grouped-by keys in parallel? Doesn't that defeat the purpose of the ordering provided? I'm also not sure it would increase the throughput beyond the 1Mb per ordering key limit. An alternative would be grouping by partitioning of the ordering keys (via deterministic hash buckets for example) and then batching just within a bundle. The initial patch I wrote concatenated topic and ordering key and left output shards unchanged. After I reviewed the ordering key limitations I realized there's almost nothing to be gained there because of the per key throughput limit. Since messages with and without ordering keys can be published to the same topic and these limitations only apply to messages with ordering keys I'll leave the shard allocation as is for messages without an ordering key and apply a deterministic hashing function on the ordering key to determine the shard number for messages with an ordering key. `PubsubBoundedWriter` batches by topic per bundle and I'll extend and reuse that in `PubsubUnboundedSink` to batch by topic and ordering key like you suggested. That closely resembles what a `KafkaProducer` does behind the scenes to batch by topic and partition. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org