gemini-code-assist[bot] commented on PR #31608:
URL: https://github.com/apache/beam/pull/31608#issuecomment-3312933955
## Summary of Changes
This pull request significantly enhances Apache Beam's Google Cloud Pub/Sub
I/O connector by enabling the use of ordering keys when writing messages. This
feature is crucial for applications requiring strict message order guarantees
for specific data partitions. The changes include a new schema-based coder for
`PubsubMessage` to improve future compatibility, refined batching and sharding
mechanisms within the Pub/Sub sink to handle ordering keys correctly, and
explicit error handling for the DataflowRunner when ordering keys are used
without the necessary experimental flag. These updates ensure robust and
compliant interaction with Pub/Sub's message ordering capabilities.
### Highlights
* **Pubsub Ordering Key Support**: Added support for writing messages to
Google Cloud Pub/Sub with ordering keys, allowing downstream consumers to
receive messages in a deterministic order for a given key.
* **PubsubMessage SchemaCoder**: Introduced a new `PubsubMessageSchemaCoder`
to provide a flexible and forward-compatible way to encode and decode
`PubsubMessage` objects, addressing limitations of previous bespoke coders.
* **DataflowRunner Compatibility**: The DataflowRunner now explicitly throws
an `UnsupportedOperationException` if a pipeline attempts to use Pub/Sub
ordering keys without enabling the `--experiments=enable_custom_pubsub_sink`
flag, guiding users towards the correct configuration.
* **Improved Batching and Sharding**: The `PubsubUnboundedSink` has been
updated to correctly group messages by topic and ordering key for batch
publishing, preventing Pub/Sub service errors related to mixed ordering keys in
a single publish request. Sharding for messages with ordering keys now uses a
deterministic hash based on the ordering key.
* **Message Size Validation**: Updated the Pub/Sub message size validation
logic to accurately account for the size of ordering keys, ensuring messages
adhere to Pub/Sub's resource limits.
<details>
<summary><b>Changelog</b></summary>
* **CHANGES.md**
* Added an entry for supporting Pubsub writing with ordering keys.
*
**runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java**
* Added a check to throw `UnsupportedOperationException` if
`PubsubUnboundedSink` is configured with ordering keys on DataflowRunner
without the `enable_custom_pubsub_sink` experiment.
*
**sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PreparePubsubWriteDoFn.java**
* Added `ORDERING_KEY_MAX_BYTE_SIZE` constant.
* Updated `validatePubsubMessageSize` to include ordering key size in
validation.
* Added `usesOrderingKey` field to constructor and logic to log a
warning and drop ordering keys if the sink is not configured to use them.
*
**sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java**
* Added `getPublishWithOrderingKey()` and `setPublishWithOrderingKey()`
to `Write` transform builder.
* Introduced `withOrderingKey()` method to `PubsubIO.Write` for enabling
ordering key support.
* Modified `expand` method to use `PubsubMessageSchemaCoder` when
`publishWithOrderingKey` is true, otherwise `PubsubMessageWithTopicCoder`.
* Passed `publishWithOrderingKey` to `PreparePubsubWriteDoFn` and
`PubsubUnboundedSink`.
* Changed `output` map key in `PubsubUnboundedSink.PubsubSink.WriterFn`
from `PubsubTopic` to `KV<PubsubTopic, String>` to group by ordering key.
*
**sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java**
* Removed `else { protoMessage.setOrderingKey(""); }` block, allowing
`null` or empty string for no ordering key.
*
**sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessage.java**
* Added `withOrderingKey()` method to `PubsubMessage`.
*
**sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageSchemaCoder.java**
* New file added, providing a `SchemaCoder` for `PubsubMessage` to
ensure future compatibility.
*
**sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesAndMessageIdAndOrderingKeyCoder.java**
* Updated Javadoc to recommend `PubsubMessageSchemaCoder` for new
features.
*
**sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesAndMessageIdCoder.java**
* Updated Javadoc to recommend `PubsubMessageSchemaCoder` for new
features.
*
**sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java**
* Updated Javadoc to recommend `PubsubMessageSchemaCoder` for new
features.
*
**sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithMessageIdCoder.java**
* Updated Javadoc to recommend `PubsubMessageSchemaCoder` for new
features.
*
**sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithTopicCoder.java**
* Updated Javadoc to recommend `PubsubMessageSchemaCoder` for new
features.
*
**sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java**
* Added `publishBatchWithOrderingKey` field and getter.
* Modified `ShardFn` to deterministically shard messages with ordering
keys using `FarmHash`.
* Refactored `WriterFn` to use `Map<String, OutgoingData>` for
`orderingKeyBatches` to group messages by ordering key before publishing.
* Updated constructors to include `publishBatchWithOrderingKey`.
*
**sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PreparePubsubWriteDoFnTest.java**
* Added tests for `validatePubsubMessageSize` with ordering keys,
including size limits.
*
**sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubSubWritePayloadTranslationTest.java**
* Updated `PubsubUnboundedSink` constructor calls to include the new
`publishBatchWithOrderingKey` parameter (set to `false`).
*
**sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageSchemaCoderTest.java**
* New file added, containing tests for `PubsubMessageSchemaCoder`.
*
**sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java**
* Updated `Stamp` DoFn to support `orderingKeyFn`.
* Updated `PubsubUnboundedSink` constructor calls to include the new
`publishBatchWithOrderingKey` parameter.
* Added tests for publishing messages with ordering keys, including
scenarios with wrong coders, single messages per ordering key, and multiple
batches by ordering key, with random shuffling of input data.
*
**sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteIT.java**
* Added an integration test for
`testBoundedWriteMessageWithAttributesAndOrderingKey` to verify end-to-end
functionality.
</details>
<details>
<summary><b>Activity</b></summary>
* The author confirmed that ordering keys are preserved with both the direct
runner and Dataflow runner.
* Early feedback from `egalpin` praised the `SchemaCoder` approach for its
potential to improve future compatibility.
* Initial concerns were raised by `sjvanrossum` regarding DataflowRunner
compatibility, batching logic for ordering keys, and sharding strategy, leading
to extensive discussion.
* Reviewers `reuvenlax` and `egalpin` discussed the use case for ordering
keys in Beam, clarifying that they are primarily for downstream consumption
ordering rather than Beam's internal publishing order.
* `sjvanrossum` provided significant input and contributed patches to
address identified issues, including a more ergonomic batching mechanism and
improved sharding for ordering keys.
* The pull request experienced multiple rounds of automated reviewer
reassignments due to periods of inactivity.
* `ahmedabu98` integrated `sjvanrossum`'s patches and requested final
reviews.
* `scwhittle` and `sjvanrossum` engaged in detailed discussions on specific
code changes, covering aspects like error message clarity, constant
definitions, logging for dropped ordering keys, naming conventions, sharding
algorithms, and efficiency of the `SchemaCoder` implementation.
* Codecov reported a patch coverage of 74.75728% with 26 lines missing
coverage.
* The PR was eventually merged by `scwhittle` and `Abacn` after final checks
and test runs.
</details>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]