algirdas-k commented on issue #20950:
URL: https://github.com/apache/beam/issues/20950#issuecomment-1367158844

   It seems a lot of work for this to be solved is done:
   - [Code 
link](https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessage.java#L44)
 in an issue description points to empty line. 
   - Beam `PubsubMessage` 
[has](https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessage.java#L65-L70)
 Ordering key set/get
   - Coder exists, 
`PubsubMessageWithAttributesAndMessageIdAndOrderingKeyCoder`, which [has 
ordering key 
encoding/decoding](https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesAndMessageIdAndOrderingKeyCoder.java#L67)
   - Pipeline (at least unbounded sink) passes ordering key:
     - During 
[map](https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessages.java#L30)
 from Beam `PubsubMessage` to GCP `proto` `PubsubMessage`, to `byte[]`, 
ordering key is 
[mapped](https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessages.java#L44-L48)
 (via 
[PubsubUnboundedSink.expand](https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java#L484-L487))
     - Later, when `PubsubUnboundedSink` 
[writes](https://github.com/apache/beam/blob/v2.43.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java#L499-L527)
 these `byte[]` to `Pub/Sub`, it 
[uses](https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java#L514)
 
[OutgoingMessageCoder](https://github.com/apache/beam/blob/v2.43.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java#L98-L118),
 which does not loose ordering key, as it encodes whole proto message
   
   So technically, everything is in place, and should work, as long as correct 
coder (mapping ordering key) is used. 
   
   However, we get writes of ordering key when running on `DirectRunner`, but 
no ordering key, when running on `DataflowRunner`.
   
   ---
   
   We have a reproduce of strange behavior: 
   
   ### Case 1
   Using custom coder + `DirectRunner` -> ordering key is written to Pub/Sub
   ```java
   messages.apply(
       PubsubIO.writeMessages().to(topic)
   );
   ```
   ### Case 2
   Deploying same pipeline from **Case 1** on `DataflowRunner` -> ordering key 
is missing
   
   ### Case 3
   Copy/pasting `PubsubUnboundedSink` code into our namespace and using it 
directly to write to Pub/Sub, running on `DataflowRunner` -> **ordering key is 
written to Pub/Sub**
   ```java
   messages.apply(
       "WriteMessages",
       new PubsubUnboundedSinkCopy(
            PubsubJsonClient.FACTORY,
            ...          
   ```
   
   Maybe someone could give insight:
   - What peaces are missing, that `PubsubIO.writeMessages()` would work on 
`DataflowRunner`
   - Or why we get ordering key writes to Pub/Sub when we copy/paste sink?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to