scwhittle commented on code in PR #31608:
URL: https://github.com/apache/beam/pull/31608#discussion_r1756387038


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PreparePubsubWriteDoFn.java:
##########
@@ -66,6 +68,20 @@ static int validatePubsubMessageSize(PubsubMessage message, 
int maxPublishBatchS
     }
     int totalSize = payloadSize;
 
+    @Nullable String orderingKey = message.getOrderingKey();
+    if (orderingKey != null) {
+      int orderingKeySize = 
orderingKey.getBytes(StandardCharsets.UTF_8).length;
+      if (orderingKeySize > PUBSUB_MESSAGE_ATTRIBUTE_MAX_VALUE_BYTES) {

Review Comment:
   how about a new constant for ordering key length? It appears to also be 1K 
but perhaps one but not the other could be increased in the future.



##########
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java:
##########
@@ -2033,6 +2034,16 @@ private static void translate(
         PubsubUnboundedSink overriddenTransform,
         StepTranslationContext stepContext,
         PCollection input) {
+      if (overriddenTransform.getPublishBatchWithOrderingKey()) {
+        throw new UnsupportedOperationException(
+            String.format(
+                "%s does not currently support publishing with ordering keys. "

Review Comment:
   nit: maybe better as "The Dataflow runner does not "  instead of the class 
name which won't mean much to a user since it is automatically inserted.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PreparePubsubWriteDoFn.java:
##########
@@ -175,6 +193,16 @@ public void process(
           .add("pubsub", "topic", 
PubsubClient.topicPathFromPath(topic).getDataCatalogSegments());
       reportedLineage = topic;
     }
+    // TODO: Remove this check once Dataflow's native sink supports ordering 
keys.

Review Comment:
   should we remove this? It could break existing users where an ordering key 
is set (and ignored) but publishing of the message still occurs.  If we 
document that ordering key is only published if the withOrderingKey option is 
used then I think it would be better to drop the key here and logging that it 
is being dropped.
   



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java:
##########
@@ -1621,51 +1646,44 @@ public void startBundle(StartBundleContext c) throws 
IOException {
       public void processElement(@Element PubsubMessage message, @Timestamp 
Instant timestamp)
           throws IOException, SizeLimitExceededException {
         // Validate again here just as a sanity check.
+        // TODO(sjvanrossum): https://github.com/apache/beam/issues/31800
         PreparePubsubWriteDoFn.validatePubsubMessageSize(message, 
maxPublishBatchByteSize);
-        byte[] payload = message.getPayload();
-        int messageSize = payload.length;
+        // NOTE: The record id is always null.

Review Comment:
   add explanation why?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to