sjvanrossum commented on PR #31608:
URL: https://github.com/apache/beam/pull/31608#issuecomment-2176125319

   > The DataflowRunner overrides the pubsub write transform using 
org.apache.beam.runners.dataflow.DataflowRunner.StreamingPubsubIOWrite so 
org.apache.beam.runners.dataflow.worker.PubsubSink is used. It would be nice to 
prevent using the ordering key for now with the DataflowRunner unless the 
experiment to use the beam implementation is present.
   
   Agreed, I'll throw an exception if the runner is `DataflowRunner` and the 
`enable_custom_pubsub_sink` experiment is not present during 
`PubsubIO.Write#expand()` with details and instructions to resolve the issue.
   
   > To add support for it to Dataflow, it appears that if 
PUBSUB_SERIALIZED_ATTRIBUTES_FN is set, that maps bytes to PubsubMessage which 
already includes the ordering key. But for the ordering key to be respected for 
publishing, additional changes would be needed in the dataflow service backend. 
Currently it looks like it would just be dropped but if it was respected the 
service would also need to be updated to ensure batching doesn't occur across 
ordering keys.
   
   Agreed, I'll create a new bug for this to continue this discussion 
internally.
   
   > > User configuration of the number of output shards or the use of a single 
output shard for messages with ordering keys (due to 1 MBps throughput limit 
per ordering key) is an open topic.
   > 
   > Are you considering producing to a single ordering key from multiple 
distinct grouped-by keys in parallel? Doesn't that defeat the purpose of the 
ordering provided? I'm also not sure it would increase the throughput beyond 
the 1Mb per ordering key limit. An alternative would be grouping by 
partitioning of the ordering keys (via deterministic hash buckets for example) 
and then batching just within a bundle.
   
   The initial patch I wrote concatenated topic and ordering key and left 
output shards unchanged.
   After I reviewed the ordering key limitations I realized there's almost 
nothing to be gained there because of the per key throughput limit.
   Since messages with and without ordering keys can be published to the same 
topic and these limitations only apply to messages with ordering keys I'll 
leave the shard allocation as is for messages without an ordering key and apply 
a deterministic hashing function on the ordering key to determine the shard 
number for messages with an ordering key.
   
   `PubsubBoundedWriter` batches by topic per bundle and I'll extend and reuse 
that in `PubsubUnboundedSink` to batch by topic and ordering key like you 
suggested.
   That closely resembles what a `KafkaProducer` does behind the scenes to 
batch by topic and partition.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to