tvalentyn commented on code in PR #37345:
URL: https://github.com/apache/beam/pull/37345#discussion_r2996712381


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java:
##########
@@ -1727,7 +1727,10 @@ public void startBundle(StartBundleContext c) throws 
IOException {
         this.pubsubClient =
             getPubsubClientFactory()
                 .newClient(
-                    getTimestampAttribute(), null, 
c.getPipelineOptions().as(PubsubOptions.class));
+                    getTimestampAttribute(),
+                    null,
+                    c.getPipelineOptions().as(PubsubOptions.class),
+                    Write.this.getPubsubRootUrl());

Review Comment:
   for my education, why was this necessary?



##########
sdks/python/apache_beam/io/external/gcp/pubsub.py:
##########
@@ -150,18 +152,24 @@ def __init__(
         in a ReadFromPubSub PTransform to deduplicate messages.
       timestamp_attribute: If set, will set an attribute for each Cloud Pub/Sub
         message with the given name and the message's publish time as the 
value.
+      publish_with_ordering_key: If True, enables ordering key support when
+        publishing messages. The ordering key must be set on each
+        PubsubMessage via the ``ordering_key`` attribute. Requires
+        messages to be routed to the same region.
     """
     self.params = WriteToPubsubSchema(
         topic=topic,
         id_label=id_label,
         # with_attributes=with_attributes,
-        timestamp_attribute=timestamp_attribute)
+        timestamp_attribute=timestamp_attribute,
+        publish_with_ordering_key=publish_with_ordering_key)
     self.expansion_service = expansion_service
     self.with_attributes = with_attributes
 
   def expand(self, pvalue):
     if self.with_attributes:
-      pcoll = pvalue | 'ToProto' >> Map(pubsub.WriteToPubSub.to_proto_str)
+      pcoll = pvalue | 'ToProto' >> Map(

Review Comment:
   Re: `Map(pubsub.WriteToPubSub.to_proto_str)`  - was this a typo that was 
meant to be `Map(pubsub.WriteToPubSub.message_to_proto_str)` ? The latter seems 
to have some type checking which probably wouldn't hurt.



##########
sdks/python/apache_beam/io/external/gcp/pubsub.py:
##########
@@ -150,18 +152,24 @@ def __init__(
         in a ReadFromPubSub PTransform to deduplicate messages.
       timestamp_attribute: If set, will set an attribute for each Cloud Pub/Sub
         message with the given name and the message's publish time as the 
value.
+      publish_with_ordering_key: If True, enables ordering key support when
+        publishing messages. The ordering key must be set on each
+        PubsubMessage via the ``ordering_key`` attribute. Requires

Review Comment:
   > Requires
           messages to be routed to the same region.
   
   What does this mean?



##########
sdks/python/apache_beam/io/gcp/pubsub_integration_test.py:
##########
@@ -305,6 +305,51 @@ def test_batch_write_with_attributes(self):
     """Test WriteToPubSub in batch mode with attributes."""
     self._test_batch_write(with_attributes=True)
 
+  @pytest.mark.it_postcommit

Review Comment:
   this will work only in direct runner, but not in dataflow runner - correct?



##########
sdks/python/apache_beam/io/gcp/pubsub_integration_test.py:
##########
@@ -305,6 +305,51 @@ def test_batch_write_with_attributes(self):
     """Test WriteToPubSub in batch mode with attributes."""
     self._test_batch_write(with_attributes=True)
 
+  @pytest.mark.it_postcommit

Review Comment:
   also wondering, if we can have a warning for Dataflow that that advises 
users to use xlang version if they wish to use ordering key. IIRC we have 
something like that in Java SDK?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to