gemini-code-assist[bot] commented on PR #31608:
URL: https://github.com/apache/beam/pull/31608#issuecomment-3312933955

   ## Summary of Changes
   
   
   
   This pull request significantly enhances Apache Beam's Google Cloud Pub/Sub 
I/O connector by enabling the use of ordering keys when writing messages. This 
feature is crucial for applications requiring strict message order guarantees 
for specific data partitions. The changes include a new schema-based coder for 
`PubsubMessage` to improve future compatibility, refined batching and sharding 
mechanisms within the Pub/Sub sink to handle ordering keys correctly, and 
explicit error handling for the DataflowRunner when ordering keys are used 
without the necessary experimental flag. These updates ensure robust and 
compliant interaction with Pub/Sub's message ordering capabilities.
   
   ### Highlights
   
   * **Pubsub Ordering Key Support**: Added support for writing messages to 
Google Cloud Pub/Sub with ordering keys, allowing downstream consumers to 
receive messages in a deterministic order for a given key.
   * **PubsubMessage SchemaCoder**: Introduced a new `PubsubMessageSchemaCoder` 
to provide a flexible and forward-compatible way to encode and decode 
`PubsubMessage` objects, addressing limitations of previous bespoke coders.
   * **DataflowRunner Compatibility**: The DataflowRunner now explicitly throws 
an `UnsupportedOperationException` if a pipeline attempts to use Pub/Sub 
ordering keys without enabling the `--experiments=enable_custom_pubsub_sink` 
flag, guiding users towards the correct configuration.
   * **Improved Batching and Sharding**: The `PubsubUnboundedSink` has been 
updated to correctly group messages by topic and ordering key for batch 
publishing, preventing Pub/Sub service errors related to mixed ordering keys in 
a single publish request. Sharding for messages with ordering keys now uses a 
deterministic hash based on the ordering key.
   * **Message Size Validation**: Updated the Pub/Sub message size validation 
logic to accurately account for the size of ordering keys, ensuring messages 
adhere to Pub/Sub's resource limits.
   
   <details>
   <summary><b>Changelog</b></summary>
   
   * **CHANGES.md**
       * Added an entry for supporting Pubsub writing with ordering keys.
   * 
**runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java**
       * Added a check to throw `UnsupportedOperationException` if 
`PubsubUnboundedSink` is configured with ordering keys on DataflowRunner 
without the `enable_custom_pubsub_sink` experiment.
   * 
**sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PreparePubsubWriteDoFn.java**
       * Added `ORDERING_KEY_MAX_BYTE_SIZE` constant.
       * Updated `validatePubsubMessageSize` to include ordering key size in 
validation.
       * Added `usesOrderingKey` field to constructor and logic to log a 
warning and drop ordering keys if the sink is not configured to use them.
   * 
**sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java**
       * Added `getPublishWithOrderingKey()` and `setPublishWithOrderingKey()` 
to `Write` transform builder.
       * Introduced `withOrderingKey()` method to `PubsubIO.Write` for enabling 
ordering key support.
       * Modified `expand` method to use `PubsubMessageSchemaCoder` when 
`publishWithOrderingKey` is true, otherwise `PubsubMessageWithTopicCoder`.
       * Passed `publishWithOrderingKey` to `PreparePubsubWriteDoFn` and 
`PubsubUnboundedSink`.
       * Changed `output` map key in `PubsubUnboundedSink.PubsubSink.WriterFn` 
from `PubsubTopic` to `KV<PubsubTopic, String>` to group by ordering key.
   * 
**sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java**
       * Removed `else { protoMessage.setOrderingKey(""); }` block, allowing 
`null` or empty string for no ordering key.
   * 
**sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessage.java**
       * Added `withOrderingKey()` method to `PubsubMessage`.
   * 
**sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageSchemaCoder.java**
       * New file added, providing a `SchemaCoder` for `PubsubMessage` to 
ensure future compatibility.
   * 
**sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesAndMessageIdAndOrderingKeyCoder.java**
       * Updated Javadoc to recommend `PubsubMessageSchemaCoder` for new 
features.
   * 
**sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesAndMessageIdCoder.java**
       * Updated Javadoc to recommend `PubsubMessageSchemaCoder` for new 
features.
   * 
**sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java**
       * Updated Javadoc to recommend `PubsubMessageSchemaCoder` for new 
features.
   * 
**sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithMessageIdCoder.java**
       * Updated Javadoc to recommend `PubsubMessageSchemaCoder` for new 
features.
   * 
**sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithTopicCoder.java**
       * Updated Javadoc to recommend `PubsubMessageSchemaCoder` for new 
features.
   * 
**sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java**
       * Added `publishBatchWithOrderingKey` field and getter.
       * Modified `ShardFn` to deterministically shard messages with ordering 
keys using `FarmHash`.
       * Refactored `WriterFn` to use `Map<String, OutgoingData>` for 
`orderingKeyBatches` to group messages by ordering key before publishing.
       * Updated constructors to include `publishBatchWithOrderingKey`.
   * 
**sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PreparePubsubWriteDoFnTest.java**
       * Added tests for `validatePubsubMessageSize` with ordering keys, 
including size limits.
   * 
**sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubSubWritePayloadTranslationTest.java**
       * Updated `PubsubUnboundedSink` constructor calls to include the new 
`publishBatchWithOrderingKey` parameter (set to `false`).
   * 
**sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageSchemaCoderTest.java**
       * New file added, containing tests for `PubsubMessageSchemaCoder`.
   * 
**sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java**
       * Updated `Stamp` DoFn to support `orderingKeyFn`.
       * Updated `PubsubUnboundedSink` constructor calls to include the new 
`publishBatchWithOrderingKey` parameter.
       * Added tests for publishing messages with ordering keys, including 
scenarios with wrong coders, single messages per ordering key, and multiple 
batches by ordering key, with random shuffling of input data.
   * 
**sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteIT.java**
       * Added an integration test for 
`testBoundedWriteMessageWithAttributesAndOrderingKey` to verify end-to-end 
functionality.
   </details>
   
   <details>
   <summary><b>Activity</b></summary>
   
   * The author confirmed that ordering keys are preserved with both the direct 
runner and Dataflow runner.
   * Early feedback from `egalpin` praised the `SchemaCoder` approach for its 
potential to improve future compatibility.
   * Initial concerns were raised by `sjvanrossum` regarding DataflowRunner 
compatibility, batching logic for ordering keys, and sharding strategy, leading 
to extensive discussion.
   * Reviewers `reuvenlax` and `egalpin` discussed the use case for ordering 
keys in Beam, clarifying that they are primarily for downstream consumption 
ordering rather than Beam's internal publishing order.
   * `sjvanrossum` provided significant input and contributed patches to 
address identified issues, including a more ergonomic batching mechanism and 
improved sharding for ordering keys.
   * The pull request experienced multiple rounds of automated reviewer 
reassignments due to periods of inactivity.
   * `ahmedabu98` integrated `sjvanrossum`'s patches and requested final 
reviews.
   * `scwhittle` and `sjvanrossum` engaged in detailed discussions on specific 
code changes, covering aspects like error message clarity, constant 
definitions, logging for dropped ordering keys, naming conventions, sharding 
algorithms, and efficiency of the `SchemaCoder` implementation.
   * Codecov reported a patch coverage of 74.75728% with 26 lines missing 
coverage.
   * The PR was eventually merged by `scwhittle` and `Abacn` after final checks 
and test runs.
   </details>
   
   
   
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to